Tools and helpers for writing and running MapReduce jobs on Hadoop and EMR.
job.Init*Job methods create command line flags for triggering either mapper or reducer functions.
example --stage=mapper
example --stage=reducer
Supported job types
func InitRawJob(mapper func(io.Writer, io.Reader), reducer func(io.Writer, io.Reader))
func InitByteJob(mapper func(*ByteKVWriter, io.Reader), reducer func(io.Writer, *ByteKVReader))
func InitJsonJob(mapper func(*JsonKVWriter, io.Reader), reducer func(io.Writer, *JsonKVReader))
job.Log is an instance of go's logger struct which logs each line with a prefix to stderr so the runner can extract them.
job.Log.Print("My log line")
job.Count writes a counter line to stderr with the predefined counter group so the runner can fetch them.
job.Count("myCounter", 1)
job.Config retrieves and decodes the job config passed from the runner.
cfg := map[string]string{}
err := job.Config(&cfg)
For testing mappers and reducers use tester.Test*Job functions which simulate mapreduce by streming input into mapper, sorting mapper's output, streaming it to the reducer and writing reducer's output to the defined output writer.
func TestRawJob(input []io.Reader, output io.Writer, mapper func(io.Writer, io.Reader), reducer func(io.Writer, io.Reader))
func TestByteJob(input []io.Reader, output io.Writer, mapper func(*ByteKVWriter, io.Reader), reducer func(io.Writer, *ByteKVReader))
func TestJsonJob(input []io.Reader, output io.Writer, mapper func(*JsonKVWriter, io.Reader), reducer func(io.Writer, *JsonKVReader))
If input reader is an instance of tester.Reader, you can also pass in the filename which will be set as an env variable (mapreduce_map_input_file) in mapper.
files := []io.Reader{
&tester.Reader{Filename: "filename", Data: someReader},
}
tester.TestByteJob(files, out, mapper, reducer)
API reference the for job package
MrGob requirest both api (for fetching job status) and ssh (for executing commands) access to the cluster. At the moment the only supported provider is AWS EMR.
awsConfig := &aws.Config{
Region: &app.Env.AWS_REGION,
}
sshConfig := &ssh.ClientConfig{
User: "hadoop",
Auth: []ssh.AuthMethod{ssh.PublicKeys(sshKey)},
}
runner.SetDefaultHadoopProvider(runner.NewEmrProvider("eventlog-processor", sshConfig, awsConfig))
Passing command line arguments directly
cmd := runner.NewRawMapReduce(args...)
Using MapReduceConfig
cmd, err := runner.NewMapReduce(&runner.MapReduceConfig{
Name: "job-name",
JobPath: "s3://bucket/jobFile",
ReduceTasks: 1,
MapTasks: 1,
JobConfig: map[string]string{"test": "123"},
Input: []string{"s3://bucket/files/"},
Output: "s3://bucker/output/",
CustomProperties: map[string]string{
"mapreduce.job.queuename": "myqueue",
},
})
// Sync
status = cmd.Run()
// Async
go cmd.Run()
status = cmd.Wait()
Each command can be run only once.
// All the data
mrd, err := cmd.FetchDebugData()
// Hadoop command output
stdOut, stdErr, cmdErr = cmd.CmdOutput()
// Mapper and reducer logs (can only be called once the job is completed)
logs, err = cmd.FetchApplicationLogs()
// Counters
counters, err = cmd.FetchJobCounters()
err := runner.ExecOnCluster(retries, "aws", "s3", "ls", "/path")