All about reducers count
Finding the right number of reducers for a flow, or part of it, is a very important and tedious step toward optimal performances and cluster resources usage. This page details the behaviour of and the facilities provided by Scalding to help in that process.
By default, Scalding reads the "mapred.reduce.tasks" JobConf's value as the number of reducers used throughout the whole flow. If not present, it defaults to the value of 1.
Job developer may specify the number of reducers for each aggregation results using the withReducer method on *Grouped instances.
Reducers number estimation
Scalding can optionally be instrumented to estimate the ideal number of reducers at each step of the flow using the facilities located in the reducer_estimation package. Estimation logic is provided by ReducerEstimator implementations and injected thought the Cascading's FlowStepStrategy mechanism.
Instructing Scalding to apply a reducers estimation policy
The following command-line argument instructs Scalding to use one (or several) ReducerEstimator:
-Dscalding.reducer.estimator.classes="Comma-separated list of fully qualified estimator classes" Defaults to ""
A programmatically approach is also provided:
class MyJob(args: Args) extends Job(args) {
...
scaldingConfig.addReducerEstimator(...)
...
}
Whether estimators computed value takes precedence over programmatically defined reducers number may be stated with
-Dscalding.reducer.estimator.override=boolean Defaults to false
Selecting the reducers estimation policy
Scalding comes with two built-in ReducersEstimators implementations:
-
InputSizeReducerEstimator computes the number of reducers based on input size and a fixed "bytesPerReducer" value.
It can be set by passing
-Dscalding.reducer.estimator.bytes.per.reducer=12345Defaults to 8GB -
RatioBasedEstimator computes the average ratio of mapper bytes to reducer bytes of previous steps and uses that to scale the estimate produced by InputSizeReducerEstimator.
RatioBasedEstimator optionally ignores history items whose input size is drastically different than the current job.
The
-Dscalding.reducer.estimator.input.ratio.threshold=floatparameter specifies the lower bound on allowable input size ratio. Defaults to 0.10 (10%), which sets the upper bound to 10x. The number of historical steps used for the computation can be set with-Dscalding.reducer.estimator.max.history=int. Defaults to 1.
Custom ReducerEstimator implementations may also be provided.
Monitoring and debugging
For each FlowStep, both the originally-present-in-JobConf and estimator-computed values are consultable in Hadoop's Job tracker through the following keys:
/** Output param: what the Reducer Estimator recommended, regardless of if it was used. */
"scalding.reducer.estimator.result"
/** Output param: what the original job config was. */
"scalding.reducer.estimator.original.mapred.reduce.tasks"