-- import "github.com/Zemanta/mrgob/runner"
var (
ErrMissingJobPath = fmt.Errorf("Missing job path")
ErrMissingInput = fmt.Errorf("Missing input")
ErrMissingOutput = fmt.Errorf("Missing output")
)
var (
HadoopStatusIdle HadoopStatus = 0
HadoopStatusRunning HadoopStatus = 1
HadoopStatusSuccess HadoopStatus = 2
HadoopStatusFailed HadoopStatus = -1
ErrNotRunning = fmt.Errorf("Command not running")
ErrRunning = fmt.Errorf("Application running")
ErrStarted = fmt.Errorf("Application can only be run once")
ErrMissingApplicationId = fmt.Errorf("Missing application id")
ErrMissingHadoopProvider = fmt.Errorf("Missing Hadoop provider")
)
func ExecOnCluster(retries int, arguments ...string) error
func SetDefaultHadoopProvider(p provider.HadoopProvider)
type HadoopApplicationLogs struct {
Raw string
ContainerLogs []*HadoopContainerLogs
}
func (l *HadoopApplicationLogs) AppLog() string
func (l *HadoopApplicationLogs) StdErr() string
func (l *HadoopApplicationLogs) StdOut() string
func (l *HadoopApplicationLogs) String() string
func (l *HadoopApplicationLogs) SysLog() string
type HadoopApplicationStatus struct {
App struct {
AllocatedMB int `json:"allocatedMB"`
AllocatedVCores int `json:"allocatedVCores"`
AmContainerLogs string `json:"amContainerLogs"`
AmHostHTTPAddress string `json:"amHostHttpAddress"`
ApplicationTags string `json:"applicationTags"`
ApplicationType string `json:"applicationType"`
ClusterID int `json:"clusterId"`
Diagnostics string `json:"diagnostics"`
ElapsedTime int `json:"elapsedTime"`
FinalStatus string `json:"finalStatus"`
FinishedTime int `json:"finishedTime"`
ID string `json:"id"`
MemorySeconds int `json:"memorySeconds"`
Name string `json:"name"`
NumAMContainerPreempted int `json:"numAMContainerPreempted"`
NumNonAMContainerPreempted int `json:"numNonAMContainerPreempted"`
PreemptedResourceMB int `json:"preemptedResourceMB"`
PreemptedResourceVCores int `json:"preemptedResourceVCores"`
Progress float64 `json:"progress"`
Queue string `json:"queue"`
RunningContainers int `json:"runningContainers"`
StartedTime int `json:"startedTime"`
State string `json:"state"`
TrackingUI string `json:"trackingUI"`
TrackingURL string `json:"trackingUrl"`
User string `json:"user"`
VcoreSeconds int `json:"vcoreSeconds"`
} `json:"app"`
}
type HadoopCommand struct {
}
func NewMapReduce(c *MapReduceConfig) (*HadoopCommand, error)
func NewRawMapReduce(arguments ...string) *HadoopCommand
func (hc *HadoopCommand) ApplicationId() (string, error)
func (hc *HadoopCommand) CmdOutput() (stdOut string, stdErr string, cmdErr error)
func (hc *HadoopCommand) FetchApplicationLogs() (*HadoopApplicationLogs, error)
func (hc *HadoopCommand) FetchApplicationStatus() (*HadoopApplicationStatus, error)
func (hc *HadoopCommand) FetchDebugData() (*HadoopDebugData, error)
func (hc *HadoopCommand) FetchJobCounters() (HadoopJobCounters, error)
func (hc *HadoopCommand) Run() HadoopStatus
func (hc *HadoopCommand) SetRetries(n int)
func (hc *HadoopCommand) Status() HadoopStatus
func (hc *HadoopCommand) Tries() []*HadoopRun
func (hc *HadoopCommand) Wait() HadoopStatus
type HadoopContainerLogs struct {
Container string
Host string
StdOut string
StdErr string
SysLog string
AppLog string
}
type HadoopDebugData struct {
Logs *HadoopApplicationLogs
Counters HadoopJobCounters
Status *HadoopApplicationStatus
StdOut string
StdErr string
CmdErr error
}
type HadoopJobCounterData struct {
Name string `json:"name"`
MapCounterValue int `json:"mapCounterValue"`
ReduceCounterValue int `json:"reduceCounterValue"`
TotalCounterValue int `json:"totalCounterValue"`
}
type HadoopJobCounters map[string]HadoopJobCountersGroup
func (c HadoopJobCounters) AppCounters() HadoopJobCountersGroup
type HadoopJobCountersGroup map[string]HadoopJobCounterData
func (c HadoopJobCountersGroup) String() string
type HadoopRun struct {
}
func (hr *HadoopRun) ApplicationId() (string, error)
func (hr *HadoopRun) CmdOutput() (stdOut string, stdErr string, cmdErr error)
func (hr *HadoopRun) FetchApplicationLogs() (*HadoopApplicationLogs, error)
func (hr *HadoopRun) FetchApplicationStatus() (*HadoopApplicationStatus, error)
func (hr *HadoopRun) FetchDebugData() (*HadoopDebugData, error)
func (hr *HadoopRun) FetchJobCounters() (HadoopJobCounters, error)
type HadoopStatus int
type MapReduceConfig struct {
// Job name.
Name string
// Number of reducers.
ReduceTasks int
// Number of mappers.
MapTasks int
// S3 or HDFS path to the executable job implementing "Init*Job" interface.
JobPath string
// Job configuration that will be made available in mapper and reducer jobs.
JobConfig interface{}
// List of input files.
Input []string
// Output directory.
Output string
// Other custom -D properties passes to the job.
CustomProperties map[string]string
// Other files that will be downloaded next to the executable before running the job.
AdditionalFiles []string
// Environment options passed to the mapreduce jobs.
Env map[string]string
}