package main import ( // External werr "git.gibonuddevalla.se/go/wrappederror" "github.com/jmoiron/sqlx" // Standard "bytes" "database/sql" "encoding/json" "fmt" "os/exec" "strings" "time" ) const ENV_NAME = 0 const SCRIPT_NAME = 1 type ScriptScheduler struct { EventQueue chan string } type ScriptExecution struct { ID int TimeStart sql.NullTime `db:"time_start"` TimeEnd sql.NullTime `db:"time_end"` ScriptName string `db:"script_name"` Source string Data string SSH string Env string OutputStdout sql.NullString `db:"output_stdout"` OutputStderr sql.NullString `db:"output_stderr"` ExitCode sql.NullInt16 } type ScriptExecutionBrief struct { ID int TimeStart sql.NullTime `db:"time_start"` TimeEnd sql.NullTime `db:"time_end"` ScriptName string `db:"script_name"` SSH string ExitCode sql.NullInt16 HasSource bool `db:"has_source"` HasData bool `db:"has_data"` HasEnv bool `db:"has_env"` HasOutputStdout bool `db:"has_output_stdout"` HasOutputStderr bool `db:"has_output_stderr"` } func NewScriptScheduler() (sched ScriptScheduler) { sched.EventQueue = make(chan string, 64) return } func (self ScriptScheduler) Loop() { // {{{ // Lets check for somehow missed executions every minute. // An event SHOULD be received for each new created, but let's be sure. tick := time.NewTicker(time.Second * 60) var event string for { select { case <-tick.C: self.HandleNextExecution() case event = <-self.EventQueue: if event == "SCRIPT_SCHEDULED" { self.HandleNextExecution() } } } } // }}} func (self ScriptScheduler) HandleNextExecution() { // {{{ se, err := self.GetNextExecution() if err != nil { logger.Error("script_scheduler", "error", err) return } if se.ID == 0 { return } // Setting the time_start value on the database row makes sure it doesn't get handled again. se.TimeStart.Time = time.Now() se.TimeStart.Valid = true se.Update() logger.Info("script_scheduler", "op", "execute", "id", se.ID) var fnames []string fnames, err = se.UploadScript() if err != nil { err = werr.Wrap(err) logger.Error("script_execution", "op", "upload_script", "id", se.ID, "error", err) return } err = se.UploadEnv(fnames[ENV_NAME], fnames[SCRIPT_NAME]) if err != nil { err = werr.Wrap(err) logger.Error("script_execution", "op", "upload_env", "id", se.ID, "error", err) return } err = se.RunScript(fnames[ENV_NAME]) if err != nil { err = werr.Wrap(err) logger.Error("script_execution", "op", "run_script", "id", se.ID, "error", err) return } se.SSHCommand([]byte{}, false, fmt.Sprintf("rm %s %s", fnames[ENV_NAME], fnames[SCRIPT_NAME])) logger.Info("script_scheduler", "op", "handled", "script", fnames[SCRIPT_NAME]) } // }}} func (self ScriptScheduler) GetNextExecution() (e ScriptExecution, err error) { // {{{ row := db.QueryRowx(` SELECT e.id, time_start, time_end, data, ssh, env, output_stdout, output_stderr, exitcode, sl.source FROM execution e INNER JOIN script_log sl ON e.script_log_id = sl.id WHERE time_start IS NULL ORDER BY id ASC `) err = row.StructScan(&e) // Returned ScriptExecution is having an ID of 0 if none was returned if err == sql.ErrNoRows { err = nil return } if err != nil { err = werr.Wrap(err) return } return } // }}} func (se *ScriptExecution) Update() (err error) { // {{{ _, err = db.Exec(` UPDATE public.execution SET time_start = $2, time_end = $3, output_stdout = $4, output_stderr = $5, exitcode = $6 WHERE id=$1`, se.ID, se.TimeStart, se.TimeEnd, se.OutputStdout, se.OutputStderr, se.ExitCode, ) if err != nil { err = werr.Wrap(err) logger.Error("script_execution", "op", "execute", "id", se.ID, "error", err) return } return } // }}} func (se *ScriptExecution) SSHCommand(stdin []byte, log bool, args ...string) (stdoutString string, err error) { // {{{ params := []string{se.SSH} params = append(params, args...) cmd := exec.Command("ssh", params...) stdout := new(bytes.Buffer) stderr := new(bytes.Buffer) cmd.Stdin = bytes.NewReader(stdin) cmd.Stdout = stdout cmd.Stderr = stderr err = cmd.Run() // A cleanup command is run after the script. This shouldn't overwrite the output from the actual script. if log { se.OutputStdout.String = stdout.String() se.OutputStderr.String = stderr.String() se.ExitCode.Int16 = int16(cmd.ProcessState.ExitCode()) se.OutputStdout.Valid = true se.OutputStderr.Valid = true se.ExitCode.Valid = true } se.TimeEnd.Time = time.Now() se.TimeEnd.Valid = true se.Update() if err != nil { err = werr.Wrap(err) return } return stdout.String(), nil } // }}} func (se *ScriptExecution) UploadScript() (fnames []string, err error) { // {{{ var filenames string filenames, err = se.SSHCommand( []byte(se.Source), true, `sh -c 'RUNENV=$(mktemp -t datagraph.XXXXXX) && SCRIPT=$(mktemp -t datagraph.XXXXXX) && touch $RUNENV $SCRIPT && chmod 700 $RUNENV $SCRIPT && cat >$SCRIPT && echo $RUNENV $SCRIPT'`, ) if err != nil { err = werr.Wrap(err) } fnames = strings.Split(strings.TrimSpace(filenames), " ") if len(fnames) != 2 { err = werr.New("Invalid temp filename count: %d", len(fnames)) return } return fnames[:2], nil } // }}} func (se *ScriptExecution) UploadEnv(envFname, scriptFname string) (err error) { // {{{ env := make(map[string]string) err = json.Unmarshal([]byte(se.Env), &env) if err != nil { err = werr.Wrap(err) return } var script = "#!/bin/sh\n\n" for key, val := range env { script = script + fmt.Sprintf("export %s=\"%s\"\n", key, strings.ReplaceAll(val, `"`, `\"`)) } script = script + "\n" + scriptFname + "\n" _, err = se.SSHCommand( []byte(script), true, fmt.Sprintf(`sh -c 'cat >%s'`, envFname), ) if err != nil { err = werr.Wrap(err) } return } // }}} func (se *ScriptExecution) RunScript(fname string) (err error) { // {{{ _, err = se.SSHCommand([]byte(se.Data), true, fname) if err != nil { err = werr.Wrap(err) } return } // }}} func GetScriptExecutions() (executions []ScriptExecutionBrief, err error) { // {{{ executions = []ScriptExecutionBrief{} var rows *sqlx.Rows rows, err = db.Queryx(` SELECT e.id, time_start, time_end, ssh, sl.name AS script_name, exitcode, LENGTH(source) > 0 AS has_source, LENGTH(data::varchar) > 0 AS has_data, LENGTH(env::varchar) > 0 AS has_env, LENGTH(output_stdout) > 0 AS has_output_stdout, LENGTH(output_stderr) > 0 AS has_output_stderr FROM execution e INNER JOIN script_log sl ON e.script_log_id = sl.id ORDER BY id DESC LIMIT 100 `) if err != nil { err = werr.Wrap(err) return } defer rows.Close() for rows.Next() { var execution ScriptExecutionBrief err = rows.StructScan(&execution) if err != nil { err = werr.Wrap(err) return } executions = append(executions, execution) } return } // }}} func GetScriptExecution(id int) (e ScriptExecution, err error) { row := db.QueryRowx(` SELECT e.id, time_start, time_end, ssh, sl.name AS script_name, sl.source, exitcode, data, env, output_stdout, output_stderr FROM execution e INNER JOIN script_log sl ON e.script_log_id = sl.id WHERE e.id = $1`, id, ) err = row.StructScan(&e) if err != nil { err = werr.Wrap(err) return } return }