Divide and Recombine for Large, Complex Data
Author: Ryan Hafen
Version: 0.7.5
Date: 2015-01-06
License: BSD_3_clause + file LICENSE
Description
Methods for dividing data into subsets, applying analytical methods to the subsets, and recombining the results. Comes with a generic MapReduce interface as well. Works with key-value pairs stored in memory, on local disk, or on HDFS, in the latter case using the R and Hadoop Integrated Programming Environment (RHIPE). Also includes experimental support for key-value pairs stored as Spark RDDs using the SparkR package.
Depends
R (>= 2.15.1)
Suggests
testthat (>= 0.8.1), roxygen2 (>= 4.0.0), Rhipe, SparkR
Add Key-Value Pairs to a Data Connection
Description
Add key-value pairs to a data connectionUsage
addData(conn, data, overwrite = FALSE)
Arguments
- conn
- a kvConnection object
- data
- a list of key-value pairs (list of lists where each sub-list has two elements, the key and the value)
- overwrite
- if data with the same key is already present in the data, should it be overwritten? (does not work for HDFS connections)
Note
This is generally not recommended for HDFS as it writes a new file each time it is called, and can result in more individual files than Hadoop likes to deal with.
See also
removeData
, localDiskConn
, hdfsConn
Author
Ryan HafenAdd a Transformation Function to a Distributed Data Object
Description
Add a transformation function to be applied to each subset of a distributed data objectUsage
addTransform(obj, fn, name = NULL, params = NULL, packages = NULL)
Arguments
- obj
- a distributed data object
- fn
- a function to be applied to each subset of
obj
- name
- optional name of the transformation
- params
- a named list of parameters external to
obj
that are needed in the transformation function (most should be taken care of automatically such that this is rarely necessary to specify) - packages
- a vector of R package names that contain functions used in
fn
(most should be taken care of automatically such that this is rarely necessary to specify)
Details
When you add a transformation to a distributed data object, the transformation is not applied immediately, but is deferred until a function that kicks off a computation is done. These include divide
, recombine
, drJoin
, drLapply
, drFilter
, drSample
, drSubset
. When any of these are invoked on an object with a transformation attached to it, the transformation will be applied in the map phase of computation prior to any other computation. The transformation will also be applied any time a subset of the data is requested. Thus although the data has not been physically transformed after a call of addTransform
, we can think of it conceptually as already being transformed.
When addTransform
is called, it is tested on a subset of the data to make sure we have all of the necessary global variables and packages loaded necessary to portably perform the transformation.
It is possible to add multiple transformations to a distributed data object, in which case they are applied in the order supplied, but only one transform should be necessary.
"Census Income" Dataset
Description
"Census Income" dataset from UCI machine learning repositoryUsage
adult
Format
(From UCI machine learning repository)
- age. continuous
- workclass. Private, Self-emp-not-inc, Self-emp-inc, Federal-gov, Local-gov, State-gov, Without-pay, Never-worked
- fnlwgt. continuous
- education. Bachelors, Some-college, 11th, HS-grad, Prof-school, Assoc-acdm, Assoc-voc, 9th, 7th-8th, 12th, Masters, 1st-4th, 10th, Doctorate, 5th-6th, Preschool education-num: continuous
- marital. Married-civ-spouse, Divorced, Never-married, Separated, Widowed, Married-spouse-absent, Married-AF-spouse
- occupation. Tech-support, Craft-repair, Other-service, Sales, Exec-managerial, Prof-specialty, Handlers-cleaners, Machine-op-inspct, Adm-clerical, Farming-fishing, Transport-moving, Priv-house-serv, Protective-serv, Armed-Forces
- relationship. Wife, Own-child, Husband, Not-in-family, Other-relative, Unmarried
- race. White, Asian-Pac-Islander, Amer-Indian-Eskimo, Other, Black
- sex. Female, Male
- capgain. continuous
- caploss. continuous
- hoursperweek. continuous
- nativecountry. United-States, Cambodia, England, Puerto-Rico, Canada, Germany, Outlying-US(Guam-USVI-etc), India, Japan, Greece, South, China, Cuba, Iran, Honduras, Philippines, Italy, Poland, Jamaica, Vietnam, Mexico, Portugal, Ireland, France, Dominican-Republic, Laos, Ecuador, Taiwan, Haiti, Columbia, Hungary, Guatemala, Nicaragua, Scotland, Thailand, Yugoslavia, El-Salvador, Trinadad&Tobago, Peru, Hong, Holand-Netherlands
- income. <=50K, >50K
- incomebin. 0 if income<=50K, 1 if income>50K
"Census Income" Dataset
References
Bache, K. & Lichman, M. (2013). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science.
Applies the transformation function(s)
Description
This is called internally in the map phase of datadr MapReduce jobs. It is not meant for use outside of there, but is exported for convenience.Usage
applyTransform(transFns, x, env = NULL)
Arguments
- transFns
- from the "transforms" attribute of a ddo object
- x
- a subset of the object
- env
- the environment in which to evaluate the function (should be instantiated from calling
setupTransformEnv
) - ifNULL
, the environment will be set up for you
Turn 'ddf' Object into Data Frame
Description
Rbind all the rows of a 'ddf' object into a single data frameUsage
"as.data.frame"(x, row.names = NULL, optional = FALSE, keys = TRUE, splitVars = TRUE, bsvs = FALSE, ...)
Arguments
- x
- a 'ddf' object
- row.names
- passed to
as.data.frame
- optional
- passed to
as.data.frame
- keys
- should the key be added as a variable in the resulting data frame? (if key is not a character, it will be replaced with a md5 hash)
- splitVars
- should the values of the splitVars be added as variables in the resulting data frame?
- bsvs
- should the values of bsvs be added as variables in the resulting data frame?
- ...
- additional arguments passed to as.data.frame
Turn 'ddo' / 'ddf' Object into a list
Description
Turn 'ddo' / 'ddf' Object into a listUsage
"as.list"(x, ...)
Arguments
- x
- a 'ddo' / 'ddf' object
- ...
- additional arguments passed to
as.list
Construct Between Subset Variable (BSV)
Description
Construct between subset variable (BSV)Usage
bsv(val = NULL, desc = "")
Arguments
- val
- a scalar character, numeric, or date
- desc
- a character string describing the BSV
Details
Should be called inside the bsvFn
argument to divide
used for constructing a BSV list for each subset of a division.
See also
divide
, getBsvs
, bsvInfo
Author
Ryan HafenCharacter File Hash Function
Description
Function to be used to specify the file where key-value pairs get stored for local disk connections, useful when keys are scalar strings. Should be passed as the argumentfileHashFn
to localDiskConn
.
Usage
charFileHash(keys, conn)
Arguments
- keys
- keys to be hashed
- conn
- a "localDiskConn" object
Details
You shouldn't need to call this directly other than to experiment with what the output looks like or to get ideas on how to write your own custom hash.
See also
localDiskConn
, digestFileHash
Author
Ryan Hafen"Collect" Recombination
Description
"Collect" recombination - simply collect the results into a local list of key-value pairsUsage
combCollect(...)
Arguments
- ...
- ...
Details
This is an experimental prototype. It is to be passed as the argument combine
to recombine
.
See also
divide
, recombine
, combDdo
, combMeanCoef
, combRbind
, combMean
Author
Ryan Hafen"DDO" Recombination
Description
"DDO" recombination - simply collect the results into a "ddo" objectUsage
combDdo(...)
Arguments
- ...
- ...
Details
This is an experimental prototype. It is to be passed as the argument combine
to recombine
.
See also
divide
, recombine
, combCollect
, combMeanCoef
, combRbind
, combMean
Author
Ryan HafenMean Recombination
Description
Mean recombinationUsage
combMean(...)
Arguments
- ...
- ...
Details
This is an experimental prototype. It is to be passed as the argument combine
to recombine
.
See also
divide
, recombine
, combCollect
, combDdo
, combRbind
, combMeanCoef
Author
Ryan HafenMean Coefficient Recombination
Description
Mean coefficient recombinationUsage
combMeanCoef(...)
Arguments
- ...
- ...
Details
This is an experimental prototype. It is to be passed as the argument combine
to recombine
. It expects to be dealing with named vectors including an element n
specifying the number of rows in that subset.
See also
divide
, recombine
, rrDiv
, combCollect
, combDdo
, combRbind
, combMean
Author
Ryan Hafen"rbind" Recombination
Description
"rbind" recombinationUsage
combRbind(...)
Arguments
- ...
- ...
Details
This is an experimental prototype. It is to be passed as the argument combine
to recombine
.
See also
divide
, recombine
, combDdo
, combCollect
, combMeanCoef
, combMean
Author
Ryan HafenConditioning Variable Division
Description
Specify conditioning variable division parameters for data divisionUsage
condDiv(vars)
Arguments
- vars
- a character string or vector of character strings specifying the variables of the input data across which to divide
Value
a list to be used for the "by" argument to divide
Details
Currently each unique combination of values of vars
constitutes a subset. In the future, specifying shingles for numeric conditioning variables will be implemented.
References
- http://www.datadr.org
- Guha, S., Hafen, R., Rounds, J., Xia, J., Li, J., Xi, B., & Cleveland, W. S. (2012). Large complex data: divide and recombine (D&R) with RHIPE. Stat, 1(1), 53-67.
See also
divide
, getSplitVars
, getSplitVar
Author
Ryan HafenConvert 'ddo' / 'ddf' Objects
Description
Convert 'ddo' / 'ddf' objects between different storage backendsUsage
convert(from, to)
Arguments
- from
- a 'ddo' or 'ddf' object
- to
- a 'kvConnection' object (created with
localDiskConn
orhdfsConn
) orNULL
if an in-memory 'ddo' / 'ddf' is desired
datadr.
Description
datadr.'ddf' accessors
Description
'ddf' accessors,'ddf' accessors,'ddf' accessors,'ddf' accessors,The Number of Rows/Columns of a 'ddf' objectUsage
nrow(x)
NROW(x)
ncol(x)
NCOL(x)
"nrow"(x)
"NROW"(x)
"ncol"(x)
"NCOL"(x)
Arguments
- x
- a 'ddf' object
Instantiate a Distributed Data Frame ('ddf')
Description
Instantiate a distributed data frame ('ddf')Usage
ddf(conn, transFn = NULL, update = FALSE, reset = FALSE, control = NULL, verbose = TRUE)
Arguments
- conn
- an object pointing to where data is or will be stored for the 'ddf' object - can be a 'kvConnection' object created from
localDiskConn
orhdfsConn
, or a data frame or list of key-value pairs - transFn
- transFn a function to be applied to the key-value pairs of this data prior to doing any processing, that transform the data into a data frame if it is not stored as such
- update
- should the attributes of this object be updated? See
updateAttributes
for more details. - reset
- should all persistent metadata about this object be removed and the object created from scratch? This setting does not effect data stored in the connection location.
- control
- parameters specifying how the backend should handle things if attributes are updated (most-likely parameters to
rhwatch
in RHIPE) - seerhipeControl
andlocalDiskControl
- verbose
- logical - print messages about what is being done
Accessor Functions
Description
Accessor functions for attributes of ddo/ddf objects. Methods also includenrow
and ncol
for ddf objects.,Accessor methods for 'ddo' and 'ddf' objects
Usage
kvExample(x)
bsvInfo(x)
counters(x)
splitSizeDistn(x)
splitRowDistn(x)
getKeys(x)
"summary"(object, ...)
"summary"(object, ...)
hasExtractableKV(x)
"names"(x)
"length"(x)
Arguments
- x
- a 'ddf'/'ddo' object
- object
- a 'ddf'/'ddo' object
- ...
- additional arguments
Instantiate a Distributed Data Object ('ddo')
Description
Instantiate a distributed data object ('ddo')Usage
ddo(conn, update = FALSE, reset = FALSE, control = NULL, verbose = TRUE)
Arguments
- conn
- an object pointing to where data is or will be stored for the 'ddf' object - can be a 'kvConnection' object created from
localDiskConn
orhdfsConn
, or a data frame or list of key-value pairs - update
- should the attributes of this object be updated? See
updateAttributes
for more details. - reset
- should all persistent metadata about this object be removed and the object created from scratch? This setting does not effect data stored in the connection location.
- control
- parameters specifying how the backend should handle things if attributes are updated (most-likely parameters to
rhwatch
in RHIPE) - seerhipeControl
andlocalDiskControl
- verbose
- logical - print messages about what is being done
Managing attributes of 'ddo' or 'ddf' objects
Description
Managing attributes of 'ddo' or 'ddf' objectsUsage
setAttributes(obj, attrs)
"setAttributes"(obj, attrs)
"setAttributes"(obj, attrs)
getAttribute(obj, attrName)
getAttributes(obj, attrNames)
"getAttributes"(obj, attrNames)
"getAttributes"(obj, attrNames)
hasAttributes(obj, ...)
"hasAttributes"(obj, attrNames)
Arguments
- obj
- 'ddo' or 'ddf' object
- attrs
- a named list of attributes to set
- attrName
- name of the attribute to get
- attrNames
- vector of names of the attributes to get
- ...
- additional arguments
Digest File Hash Function
Description
Function to be used to specify the file where key-value pairs get stored for local disk connections, useful when keys are arbitrary objects. File names are determined using a md5 hash of the object. This is the default argument forfileHashFn
in localDiskConn
.
Usage
digestFileHash(keys, conn)
Arguments
- keys
- keys to be hashed
- conn
- a "localDiskConn" object
Details
You shouldn't need to call this directly other than to experiment with what the output looks like or to get ideas on how to write your own custom hash.
See also
localDiskConn
, charFileHash
Author
Ryan HafenDivide a Distributed Data Object
Description
Divide a ddo/ddf object into subsets based on different criteriaUsage
divide(data, by = NULL, spill = 1e+06, filterFn = NULL, bsvFn = NULL, output = NULL, overwrite = FALSE, preTransFn = NULL, postTransFn = NULL, params = NULL, packages = NULL, control = NULL, update = FALSE, verbose = TRUE)
Arguments
- data
- an object of class "ddf" or "ddo" - in the latter case, need to specify
preTransFn
to coerce each subset into a data frame - by
- specification of how to divide the data - conditional (factor-level or shingles), random replicate, or near-exact replicate (to come) -- see details
- spill
- integer telling the division method how many lines of data should be collected until spilling over into a new key-value pair
- filterFn
- a function that is applied to each candidate output key-value pair to determine whether it should be (if returns
TRUE
) part of the resulting division - bsvFn
- a function to be applied to each subset that returns a list of between subset variables (BSVs)
- output
- a "kvConnection" object indicating where the output data should reside (see
localDiskConn
,hdfsConn
). IfNULL
(default), output will be an in-memory "ddo" object. - overwrite
- logical; should existing output location be overwritten? (also can specify
overwrite = "backup"
to move the existing output to _bak) - preTransFn
- a transformation function (if desired) to applied to each subset prior to division - note: this is deprecated - instead use
addTransform
prior to calling divide - postTransFn
- a transformation function (if desired) to apply to each post-division subset
- params
- a named list of parameters external to the input data that are needed in the distributed computing (most should be taken care of automatically such that this is rarely necessary to specify)
- packages
- a vector of R package names that contain functions used in
fn
(most should be taken care of automatically such that this is rarely necessary to specify) - control
- parameters specifying how the backend should handle things (most-likely parameters to
rhwatch
in RHIPE) - seerhipeControl
andlocalDiskControl
- update
- should a MapReduce job be run to obtain additional attributes for the result data prior to returning?
- verbose
- logical - print messages about what is being done
Value
an object of class "ddf" if the resulting subsets are data frames. Otherwise, an object of class "ddo".
Details
The division methods this function will support include conditioning variable division for factors (implemented -- see condDiv
), conditioning variable division for numerical variables through shingles, random replicate (implemented -- see rrDiv
), and near-exact replicate. If by
is a vector of variable names, the data will be divided by these variables. Alternatively, this can be specified by e.g. condDiv(c("var1", "var2"))
.
References
- http://www.datadr.org
- Guha, S., Hafen, R., Rounds, J., Xia, J., Li, J., Xi, B., & Cleveland, W. S. (2012). Large complex data: divide and recombine (D&R) with RHIPE. Stat, 1(1), 53-67.
See also
recombine
, ddo
, ddf
, condDiv
, rrDiv
Author
Ryan HafenFunctions used in divide()
Description
Functions used in divide()Usage
dfSplit(curDF, by, seed)
addSplitAttrs(curSplit, bsvFn, by, postTransFn = NULL)
Arguments
- curDF,seed
- arguments
- curSplit,bsvFn,by,postTransFn
- arguments
Division-Agnostic Aggregation
Description
Aggregates data by cross-classifying factors, with a formula interface similar toxtabs
Usage
drAggregate(formula, data = data, by = NULL, output = NULL, preTransFn = NULL, maxUnique = NULL, params = NULL, packages = NULL, control = NULL)
Arguments
- formula
- a
formula
object with the cross-classifying variables (separated by +) on the right hand side (or an object which can be coerced to a formula). Interactions are not allowed. On the left hand side, one may optionally give a variable name in the data representing counts; in the latter case, the columns are interpreted as corresponding to the levels of a variable. This is useful if the data have already been tabulated. - data
- a "ddf" containing the variables in the formula
formula
- by
- an optional variable name or vector of variable names by which to split up tabulations (i.e. tabulate independently inside of each unique "by" variable value). The only difference between specifying "by" and placing the variable(s) in the right hand side of the formula is how the computation is done and how the result is returned.
- output
- "kvConnection" object indicating where the output data should reside in the case of
by
being specified (seelocalDiskConn
,hdfsConn
). IfNULL
(default), output will be an in-memory "ddo" object. - preTransFn
- an optional function to apply to each subset prior to performing tabulation. The output from this function should be a data frame containing variables with names that match that of the formula provided. Note: this is deprecated - instead use
addTransform
prior to calling divide. - maxUnique
- the maximum number of unique combinations of variables to obtain tabulations for. This is meant to help against cases where a variable in the formula has a very large number of levels, to the point that it is not meaningful to tabulate and is too computationally burdonsome. If
NULL
, it is ignored. If a positive number, only the top and bottommaxUnique
tabulations by frequency are kept. - params
- a named list of parameters external to the input data that are needed in the distributed computing (most should be taken care of automatically such that this is rarely necessary to specify)
- packages
- a vector of R package names that contain functions used in
fn
(most should be taken care of automatically such that this is rarely necessary to specify) - control
- parameters specifying how the backend should handle things (most-likely parameters to
rhwatch
in RHIPE) - seerhipeControl
andlocalDiskControl
Value
a data frame of the tabulations. When "by" is specified, it is a ddf with each key-value pair corresponding to a unique "by" value, containing a data frame of tabulations.
Note
The interface is similar to xtabs
, but instead of returning a full contingency table, data is returned in the form of a data frame only with rows for which there were positive counts. This result is more similar to what is returned by aggregate
.
See also
xtabs
, updateAttributes
Author
Ryan HafenBag of Little Bootstraps Transformation Method
Description
Bag of little bootstraps transformation methodUsage
drBLB(x, statistic, metric, R, n)
Arguments
- x
- a subset of a ddf
- statistic
- a function to apply to the subset specifying the statistic to compute. Must have arguments 'data' and 'weights' - see details). Must return a vector, where each element is a statistic of interest.
- metric
- a function specifying the metric to be applied to the
R
bootstrap samples of each statistic returned bystatistic
. Expects an input vector and should output a vector. - R
- the number of bootstrap samples
- n
- the total number of observations in the data
Details
It is necessary to specify weights
as a parameter to the statistic
function because for BLB to work efficiently, it must resample each time with a sample of size n
. To make this computationally possible for very large n
, we can use weights
(see reference for details). Therefore, only methods with a weights option can legitimately be used here.
References
BLB paper
See also
divide
, recombine
Author
Ryan HafenFilter a 'ddo' or 'ddf' Object
Description
Filter a 'ddo' or 'ddf' objectUsage
drFilter(x, filterFn, output = NULL, overwrite = FALSE, params = NULL, packages = NULL, control = NULL)
Arguments
- x
- an object of class 'ddo' or 'ddf'
- filterFn
- function that takes the keys and/or values and returns either
TRUE
orFALSE
- ifTRUE
, that key-value pair will be present in the result - output
- a "kvConnection" object indicating where the output data should reside (see
localDiskConn
,hdfsConn
). IfNULL
(default), output will be an in-memory "ddo" object. - overwrite
- logical; should existing output location be overwritten? (also can specify
overwrite = "backup"
to move the existing output to _bak) - params
- a named list of parameters external to the input data that are needed in the distributed computing (most should be taken care of automatically such that this is rarely necessary to specify)
- packages
- a vector of R package names that contain functions used in
fn
(most should be taken care of automatically such that this is rarely necessary to specify) - control
- parameters specifying how the backend should handle things (most-likely parameters to
rhwatch
in RHIPE) - seerhipeControl
andlocalDiskControl
Value
a 'ddo' or 'ddf' object
See also
drJoin
, drLapply
Author
Ryan HafenGet Global Variables and Package Dependencies
Description
Get global variables and package dependencies for a functionUsage
drGetGlobals(f)
Arguments
- f
- function
Value
a list of variables (named by variable) and a vector of package names
Details
This traverses the parent environments of the supplied function and finds all global variables using findGlobals
and retrieves their values. All package function calls are also found and a list of required packages is also returned.
Author
Ryan HafenGLM Transformation Method
Description
GLM transformation methodUsage
drGLM(...)
Arguments
- ...
- arguments you would pass to the
glm
function
Details
This provides a transformation function to be called for each subset in a recombination MapReduce job that applies R's glm method and outputs the coefficients in a way that combMeanCoef
knows how to deal with. It can be applied to a ddf with addTransform
prior to calling recombine
.
See also
divide
, recombine
, rrDiv
Author
Ryan HafenHexBin Aggregation for Distributed Data Frames
Description
Create "hexbin" object of hexagonally binned data for a distributed data frame. This computation is division agnostic - it does not matter how the data frame is split up.Usage
drHexbin(data, xVar, yVar, xTransFn = identity, yTransFn = identity, xbins = 30, shape = 1, params = NULL, packages = NULL, control = NULL)
Arguments
- data
- a distributed data frame
- xVar,yVar
- names of the variables to use
- xTransFn,yTransFn
- a transformation function to apply to the x and y variables prior to binning
- xbins
- the number of bins partitioning the range of xbnds
- shape
- the shape = yheight/xwidth of the plotting regions
- params
- a named list of parameters external to the input data that are needed in the distributed computing (most should be taken care of automatically such that this is rarely necessary to specify)
- packages
- a vector of R package names that contain functions used in
fn
(most should be taken care of automatically such that this is rarely necessary to specify) - control
- parameters specifying how the backend should handle things (most-likely parameters to
rhwatch
in RHIPE) - seerhipeControl
andlocalDiskControl
Value
a "hexbin" object
References
Carr, D. B. et al. (1987) Scatterplot Matrix Techniques for Large N
. JASA 83, 398, 424--436.
See also
drQuantile
Author
Ryan HafenJoin Two Data Sources by Key
Description
Join two data sources by keyUsage
drJoin(..., output = NULL, overwrite = FALSE, postTransFn = NULL, params = NULL, packages = NULL, control = NULL)
Arguments
- output
- a "kvConnection" object indicating where the output data should reside (see
localDiskConn
,hdfsConn
). IfNULL
(default), output will be an in-memory "ddo" object. - overwrite
- logical; should existing output location be overwritten? (also can specify
overwrite = "backup"
to move the existing output to _bak) - postTransFn
- an optional function to be applied to the each final key-value pair after joining
- params
- a named list of parameters external to the input data that are needed in the distributed computing (most should be taken care of automatically such that this is rarely necessary to specify)
- packages
- a vector of R package names that contain functions used in
fn
(most should be taken care of automatically such that this is rarely necessary to specify) - control
- parameters specifying how the backend should handle things (most-likely parameters to
rhwatch
in RHIPE) - seerhipeControl
andlocalDiskControl
- ...
- named lists of input objects - assumed that all are of same type (all HDFS, all localDisk, all in-memory)
Value
a 'ddo' object stored in the output
connection, where the values are named lists with names according to the names given to the input data objects, and values are the corresponding data
See also
drFilter
, drLapply
Author
Ryan HafenApply a function to all key-value pairs of a ddo/ddf object
Description
Apply a function to all key-value pairs of a ddo/ddf object and get a new ddo object back, unless a differentcombine
strategy is specified.
Usage
drLapply(X, FUN, combine = combDdo(), output = NULL, overwrite = FALSE, params = NULL, packages = NULL, control = NULL, verbose = TRUE)
Arguments
- X
- an object of class "ddo" of "ddf"
- FUN
- a function to be applied to each subset
- combine
- optional method to combine the results
- output
- a "kvConnection" object indicating where the output data should reside (see
localDiskConn
,hdfsConn
). IfNULL
(default), output will be an in-memory "ddo" object. - overwrite
- logical; should existing output location be overwritten? (also can specify
overwrite = "backup"
to move the existing output to _bak) - params
- a named list of parameters external to the input data that are needed in the distributed computing (most should be taken care of automatically such that this is rarely necessary to specify)
- packages
- a vector of R package names that contain functions used in
fn
(most should be taken care of automatically such that this is rarely necessary to specify) - control
- parameters specifying how the backend should handle things (most-likely parameters to
rhwatch
in RHIPE) - seerhipeControl
andlocalDiskControl
- verbose
- logical - print messages about what is being done
Value
depends on combine
See also
recombine
, drFilter
, drJoin
, combDdo
, combRbind
Author
Ryan HafenSample Quantiles for 'ddf' Objects
Description
Compute sample quantiles for 'ddf' objectsUsage
drQuantile(x, var, by = NULL, probs = seq(0, 1, 0.005), preTransFn = NULL, varTransFn = identity, nBins = 10000, tails = 100, params = NULL, packages = NULL, control = NULL, ...)
Arguments
- x
- a 'ddf' object
- var
- the name of the variable to compute quantiles for
- by
- an optional variable name or vector of variable names by which to group quantile computations
- probs
- numeric vector of probabilities with values in [0-1]
- preTransFn
- a transformation function (if desired) to applied to each subset prior to computing quantiles (here it may be useful for adding a "by" variable that is not present) - note: this transformation should not modify
var
(usevarTransFn
for that) - also note: this is deprecated - instead useaddTransform
prior to calling divide - varTransFn
- transformation to apply to variable prior to computing quantiles
- nBins
- how many bins should the range of the variable be split into?
- tails
- how many exact values at each tail should be retained?
- params
- a named list of parameters external to the input data that are needed in the distributed computing (most should be taken care of automatically such that this is rarely necessary to specify)
- packages
- a vector of R package names that contain functions used in
fn
(most should be taken care of automatically such that this is rarely necessary to specify) - control
- parameters specifying how the backend should handle things (most-likely parameters to
rhwatch
in RHIPE) - seerhipeControl
andlocalDiskControl
- ...
- additional arguments
Value
data frame of quantiles q
and their associated f-value fval
. If by
is specified, then also a variable group
.
Details
This division-agnostic quantile calculation algorithm takes the range of the variable of interest and splits it into nBins
bins, tabulates counts for those bins, and reconstructs a quantile approximation from them. nBins
should not get too large, but larger nBins
gives more accuracy. If tails
is positive, the first and last tails
ordered values are attached to the quantile estimate - this is useful for long-tailed distributions or distributions with outliers for which you would like more detail in the tails.
See also
updateAttributes
Author
Ryan HafenData Input
Description
Reads a text file in table format and creates a distributed data frame from it, with cases corresponding to lines and variables to fields in the file.Usage
drRead.table(file, header = FALSE, sep = "", quote = "\"'", dec = ".", skip = 0, fill = !blank.lines.skip, blank.lines.skip = TRUE, comment.char = "#", allowEscapes = FALSE, encoding = "unknown", autoColClasses = TRUE, rowsPerBlock = 50000, postTransFn = identity, output = NULL, overwrite = FALSE, params = NULL, packages = NULL, control = NULL, ...)
drRead.csv(file, header = TRUE, sep = ",", quote = "\"", dec = ".", fill = TRUE, comment.char = "", ...)
drRead.csv2(file, header = TRUE, sep = ";", quote = "\"", dec = ",", fill = TRUE, comment.char = "", ...)
drRead.delim(file, header = TRUE, sep = "\t", quote = "\"", dec = ".", fill = TRUE, comment.char = "", ...)
drRead.delim2(file, header = TRUE, sep = "\t", quote = "\"", dec = ",", fill = TRUE, comment.char = "", ...)
Arguments
- file
- input text file - can either be character string pointing to a file on local disk, or an
hdfsConn
object pointing to a text file on HDFS (seeoutput
argument below) - header
- this and parameters other parameters below are passed to
read.table
for each chunk being processed - seeread.table
for more info. Most all have defaults or appropriate defaults are set through other format-specific functions such asdrRead.csv
anddrRead.delim
. - sep
- see
read.table
for more info - quote
- see
read.table
for more info - dec
- see
read.table
for more info - skip
- see
read.table
for more info - fill
- see
read.table
for more info - blank.lines.skip
- see
read.table
for more info - comment.char
- see
read.table
for more info - allowEscapes
- see
read.table
for more info - encoding
- see
read.table
for more info - autoColClasses
- should column classes be determined automatically by reading in a sample? This can sometimes be problematic because of strange ways R handles quotes in
read.table
, but keeping the default ofTRUE
is advantageous for speed. - rowsPerBlock
- how many rows of the input file should make up a block (key-value pair) of output?
- postTransFn
- a function to be applied after a block is read in to provide any additional processingn before the block is stored
- output
- a "kvConnection" object indicating where the output data should reside. Must be a
localDiskConn
object if input is a text file on local disk, or ahdfsConn
object if input is a text file on HDFS. - overwrite
- logical; should existing output location be overwritten? (also can specify
overwrite = "backup"
to move the existing output to _bak) - params
- a named list of parameters external to the input data that are needed in
postTransFn
- packages
- a vector of R package names that contain functions used in
fn
(most should be taken care of automatically such that this is rarely necessary to specify) - control
- parameters specifying how the backend should handle things (most-likely parameters to
rhwatch
in RHIPE) - seerhipeControl
andlocalDiskControl
- ...
- see
read.table
for more info
Value
an object of class "ddf"
Note
For local disk, the file is actually read in sequentially instead of in parallel. This is because of possible performance issues when trying to read from the same disk in parallel.
Note that if skip
is positive and/or if header
is TRUE
, it will first read these in as they only occur once in the data, and we then check for these lines in each block and remove those lines if they appear.
Also note that if you supply "Factor"
column classes, they will be converted to character.
Author
Ryan HafenTake a Sample of Key-Value Pairs Take a sample of key-value Pairs
Description
Take a Sample of Key-Value Pairs Take a sample of key-value PairsUsage
drSample(x, fraction, output = NULL, overwrite = FALSE, control = NULL)
Arguments
- x
- a 'ddo' or 'ddf' object
- fraction
- fraction of key-value pairs to keep (between 0 and 1)
- output
- a "kvConnection" object indicating where the output data should reside (see
localDiskConn
,hdfsConn
). IfNULL
(default), output will be an in-memory "ddo" object. - overwrite
- logical; should existing output location be overwritten? (also can specify
overwrite = "backup"
to move the existing output to _bak) - control
- parameters specifying how the backend should handle things (most-likely parameters to
rhwatch
in RHIPE) - seerhipeControl
andlocalDiskControl
Subsetting Distributed Data Frames
Description
Return a subset of a "ddf" object to memoryUsage
drSubset(data, subset = NULL, select = NULL, drop = FALSE, preTransFn = NULL, maxRows = 5e+05, params = NULL, packages = NULL, control = NULL, verbose = TRUE)
Arguments
- data
- object to be subsetted -- an object of class "ddf" or "ddo" - in the latter case, need to specify
preTransFn
to coerce each subset into a data frame - subset
- logical expression indicating elements or rows to keep: missing values are taken as false
- select
- expression, indicating columns to select from a data frame
- drop
- passed on to [ indexing operator
- preTransFn
- a transformation function (if desired) to applied to each subset prior to division - note: this is deprecated - instead use
addTransform
prior to calling divide - maxRows
- the maximum number of rows to return
- params
- a named list of parameters external to the input data that are needed in the distributed computing (most should be taken care of automatically such that this is rarely necessary to specify)
- packages
- a vector of R package names that contain functions used in
fn
(most should be taken care of automatically such that this is rarely necessary to specify) - control
- parameters specifying how the backend should handle things (most-likely parameters to
rhwatch
in RHIPE) - seerhipeControl
andlocalDiskControl
- verbose
- logical - print messages about what is being done
Value
data frame
Author
Ryan Hafen"Flatten" a ddf Subset
Description
Add split variables and BSVs (if any) as columns to a subset of a ddf.Usage
flatten(x)
Arguments
- x
- a value of a key-value pair
See also
getSplitVars
, getBsvs
Get Between Subset Variable
Description
For a given key-value pair, get a BSV variable value by name (if present)Usage
getBsv(x, name)
Arguments
- x
- a key-value pair or a value
- name
- the name of the BSV to get
Get Between Subset Variables
Description
For a given key-value pair, exract all BSVsUsage
getBsvs(x)
Arguments
- x
- a key-value pair or a value
Get names of the conditioning variable cuts
Description
Used internally for exported for certain reasons. Do not use explicitly.Usage
getCondCuts(df, splitVars)
Arguments
- df
- a data frame
- splitVars
- a vector of variable names to split by
Extract "Split" Variable
Description
For a given key-value pair or value, get a split variable value by name, if present (split variables are variables that define how the data was divided).Usage
getSplitVar(x, name)
Arguments
- x
- a key-value pair or a value
- name
- the name of the split variable to get
Extract "Split" Variables
Description
For a given k/v pair or value, exract all split variables (split variables are variables that define how the data was divided).Usage
getSplitVars(x)
Arguments
- x
- a key-value pair or a value
Connect to Data Source on HDFS
Description
Connect to a data source on HDFSUsage
hdfsConn(loc, type = "sequence", autoYes = FALSE, reset = FALSE, verbose = TRUE)
Arguments
- loc
- location on HDFS for the data source
- type
- the type of data ("map", "sequence", "text")
- autoYes
- automatically answer "yes" to questions about creating a path on HDFS
- reset
- should existing metadata for this object be overwritten?
- verbose
- logical - print messages about what is being done
Value
a "kvConnection" object of class "hdfsConn"
Details
This simply creates a "connection" to a directory on HDFS (which need not have data in it). To actually do things with this data, see ddo
, etc.
See also
addData
, ddo
, ddf
, localDiskConn
Author
Ryan HafenApply Function to Key-Value Pair
Description
Apply a function to a single key-value pair - not a traditional R "apply" function.Usage
kvApply(fn, kvPair, returnKV = FALSE)
Arguments
- fn
- a function
- kvPair
- a key-value pair (a list with 2 elements)
- returnKV
- should the key and value be returned?
Details
Determines how a function should be applied to a key-value pair and then applies it: if the function has two formals, it applies the function giving it the key and the value as the arguments; if the function has one formal, it applies the function giving it just the value. This provides flexibility and simplicity for when a function is only meant to be applied to the value, but still allows keys to be used if desired.
Manually Specify Key-Value Pairs
Description
Manually specify key-value pairsUsage
kvPairs(...)
Arguments
- ...
- key-value pairs (lists with two elements)
Value
a list of objects of class "kvPair"
Author
Ryan HafenConnect to Data Source on Local Disk
Description
Connect to a data source on local diskUsage
localDiskConn(loc, nBins = 0, fileHashFn = NULL, autoYes = FALSE, reset = FALSE, verbose = TRUE)
Arguments
- loc
- location on local disk for the data source
- nBins
- number of bins (subdirectories) to put data files into - if anticipating a large number of k/v pairs, it is a good idea to set this to something bigger than 0
- fileHashFn
- an optional function that operates on each key-value pair to determine the subdirectory structure for where the data should be stored for that subset, or can be specified "asis" when keys are scalar strings
- autoYes
- automatically answer "yes" to questions about creating a path on local disk
- reset
- should existing metadata for this object be overwritten?
- verbose
- logical - print messages about what is being done
Value
a "kvConnection" object of class "localDiskConn"
Details
This simply creates a "connection" to a directory on local disk (which need not have data in it). To actually do things with this connection, see ddo
, etc. Typically, you should just use loc
to specify where the data is or where you would like data for this connection to be stored. Metadata for the object is also stored in this directory.
See also
addData
, ddo
, ddf
, localDiskConn
Author
Ryan HafenSpecify Control Parameters for MapReduce on a Local Disk Connection
Description
Specify control parameters for a MapReduce on a local disk connection. Currently the parameters include:Usage
localDiskControl(cluster = NULL, map_buff_size_bytes = 10485760, reduce_buff_size_bytes = 10485760, map_temp_buff_size_bytes = 10485760)
Arguments
- cluster
- a "cluster" object obtained from
makeCluster
to allow for parallel processing - map_buff_size_bytes
- determines how much data should be sent to each map task
- reduce_buff_size_bytes
- determines how much data should be sent to each reduce task
- map_temp_buff_size_bytes
- determines the size of chunks written to disk in between the map and reduce
Note
If you have data on a shared drive that multiple nodes can access or a high performance shared file system like Lustre, you can run a local disk MapReduce job on multiple nodes by creating a multi-node cluster with makeCluster
.
If you are using multiple cores and the input data is very small, map_buff_size_bytes
needs to be small so that the key-value pairs will be split across cores.
Take a ddo/ddf HDFS data object and turn it into a mapfile
Description
Take a ddo/ddf HDFS data object and turn it into a mapfileUsage
makeExtractable(obj)
Arguments
- obj
- object of class 'ddo' or 'ddf' with an HDFS connection
Execute a MapReduce Job
Description
Execute a MapReduce jobUsage
mrExec(data, setup = NULL, map = NULL, reduce = NULL, output = NULL, overwrite = FALSE, control = NULL, params = NULL, packages = NULL, verbose = TRUE)
Arguments
- data
- a ddo/ddf object, or list of ddo/ddf objects
- setup
- an expression of R code (created using the R command
expression
) to be run before map and reduce - map
- an R expression that is evaluated during the map stage. For each task, this expression is executed multiple times (see details).
- reduce
- a vector of R expressions with names pre, reduce, and post that is evaluated during the reduce stage. For example
reduce = expression(pre = {...}, reduce = {...}, post = {...})
. reduce is optional, and if not specified the map output key-value pairs will be the result. If it is not specified, then a default identity reduce is performed. Setting it to 0 will skip the reduce altogether. - output
- a "kvConnection" object indicating where the output data should reside (see
localDiskConn
,hdfsConn
). IfNULL
(default), output will be an in-memory "ddo" object. - overwrite
- logical; should existing output location be overwritten? (also can specify
overwrite = "backup"
to move the existing output to _bak) - control
- parameters specifying how the backend should handle things (most-likely parameters to
rhwatch
in RHIPE) - seerhipeControl
andlocalDiskControl
- params
- a named list of parameters external to the input data that are needed in the map or reduce phases
- packages
- a vector of R package names that contain functions used in
fn
(most should be taken care of automatically such that this is rarely necessary to specify) - verbose
- logical - print messages about what is being done
Value
"ddo" object - to keep it simple. It is up to the user to update or cast as "ddf" if that is the desired result.
Author
Ryan HafenFunctions to Compute Summary Statistics in MapReduce
Description
Functions that are used to tabulate categorical variables and compute moments for numeric variables inside through the MapReduce framework. Used inupdateAttributes
.
Usage
tabulateMap(formula, data)
tabulateReduce(result, reduce.values, maxUnique = NULL)
calculateMoments(y, order = 1, na.rm = TRUE)
combineMoments(m1, m2)
combineMultipleMoments(...)
moments2statistics(m)
Arguments
- formula
- a formula to be used in
xtabs
- data
- a subset of a 'ddf' object
- result,reduce.values
- inconsequential
tabulateReduce
parameters - maxUnique
- the maximum number of unique combinations of variables to obtaion tabulations for. This is meant to help against cases where a variable in the formula has a very large number of levels, to the point that it is not meaningful to tabulate and is too computationally burdonsome. If
NULL
, it is ignored. If a positive number, only the top and bottommaxUnique
tabulations by frequency are kept. - y,order,na.rm
- inconsequential
calculateMoments
parameters - m1,m2
- inconsequential
combineMoments
parameters - m
- inconsequential
moments2statistics
parameters - ...
- inconsequential parameters
Print a "ddo" or "ddf" Object
Description
Print an overview of attributes of distributed data objects (ddo) or distributed data frames (ddf)Usage
"print"(x, ...)
Arguments
- x
- object to be printed
- ...
- additional arguments
Author
Ryan HafenPrint a key-value pair
Description
Print a key-value pairUsage
"print"(x, ...)
Arguments
- x
- object to be printed
- ...
- additional arguments
Print value of a key-value pair
Description
Print value of a key-value pairUsage
"print"(x, ...)
Arguments
- x
- object to be printed
- ...
- additional arguments
Experimental HDFS text reader helper function
Description
Experimental helper function for reading text data on HDFS into a HDFS connectionUsage
readHDFStextFile(input, output = NULL, overwrite = FALSE, fn = NULL, keyFn = NULL, linesPerBlock = 10000, control = NULL, update = FALSE)
Arguments
- input
- a RHIPE input text handle created with
rhfmt
- output
- an output connection such as those created with
localDiskConn
, andhdfsConn
- overwrite
- logical; should existing output location be overwritten? (also can specify
overwrite = "backup"
to move the existing output to _bak) - fn
- function to be applied to each chunk of lines (input to function is a vector of strings)
- keyFn
- optional function to determine the value of the key for each block
- linesPerBlock
- how many lines at a time to read
- control
- parameters specifying how the backend should handle things (most-likely parameters to
rhwatch
in RHIPE) - seerhipeControl
andlocalDiskControl
- update
- should a MapReduce job be run to obtain additional attributes for the result data prior to returning?
Experimental sequential text reader helper function
Description
Experimental helper function for reading text data sequentially from a file on disk and adding to connection usingaddData
Usage
readTextFileByChunk(input, output, overwrite = FALSE, linesPerBlock = 10000, fn = NULL, header = TRUE, skip = 0, recordEndRegex = NULL, cl = NULL)
Arguments
- input
- the path to an input text file
- output
- an output connection such as those created with
localDiskConn
, andhdfsConn
- overwrite
- logical; should existing output location be overwritten? (also can specify
overwrite = "backup"
to move the existing output to _bak) - linesPerBlock
- how many lines at a time to read
- fn
- function to be applied to each chunk of lines (see details)
- header
- does the file have a header
- skip
- number of lines to skip before reading
- recordEndRegex
- an optional regular expression that finds lines in the text file that indicate the end of a record (for multi-line records)
- cl
- a "cluster" object to be used for parallel processing, created using
makeCluster
Details
The function fn
should have one argument, which should expect to receive a vector of strings, each element of which is a line in the file. It is also possible for fn
to take two arguments, in which case the second argument is the header line from the file (some parsing methods might need to know the header).
Recombine
Description
Apply an analytic recombination method to a ddo/ddf object and combine the resultsUsage
recombine(data, combine = NULL, apply = NULL, output = NULL, overwrite = FALSE, params = NULL, packages = NULL, control = NULL, verbose = TRUE)
Arguments
- data
- an object of class "ddo" of "ddf"
- combine
- the method to combine the results
- apply
- a function specifying the analytic method to apply to each subset, or a pre-defined apply function (see
drBLB
,drGLM
, for example) - output
- a "kvConnection" object indicating where the output data should reside (see
localDiskConn
,hdfsConn
). IfNULL
(default), output will be an in-memory "ddo" object. - overwrite
- logical; should existing output location be overwritten? (also can specify
overwrite = "backup"
to move the existing output to _bak) - params
- a named list of parameters external to the input data that are needed in the distributed computing (most should be taken care of automatically such that this is rarely necessary to specify)
- packages
- a vector of R package names that contain functions used in
fn
(most should be taken care of automatically such that this is rarely necessary to specify) - control
- parameters specifying how the backend should handle things (most-likely parameters to
rhwatch
in RHIPE) - seerhipeControl
andlocalDiskControl
- verbose
- logical - print messages about what is being done
Value
depends on combine
References
- http://www.datadr.org
- Guha, S., Hafen, R., Rounds, J., Xia, J., Li, J., Xi, B., & Cleveland, W. S. (2012). Large complex data: divide and recombine (D&R) with RHIPE. Stat, 1(1), 53-67.
See also
divide
, ddo
, ddf
, drGLM
, drBLB
, combMeanCoef
, combMean
, combCollect
, combRbind
, drLapply
Author
Ryan HafenRemove Key-Value Pairs from a Data Connection
Description
Remove key-value pairs from a data connectionUsage
removeData(conn, keys)
Arguments
- conn
- a kvConnection object
- keys
- a list of keys indicating which k/v pairs to remove
Note
This is generally not recommended for HDFS as it writes a new file each time it is called, and can result in more individual files than Hadoop likes to deal with.
See also
removeData
, localDiskConn
, hdfsConn
Author
Ryan HafenSpecify Control Parameters for RHIPE Job
Description
Specify control parameters for a RHIPE job. Seerhwatch
for details about each of the parameters.
Usage
rhipeControl(mapred = NULL, setup = NULL, combiner = FALSE, cleanup = NULL, orderby = "bytes", shared = NULL, jarfiles = NULL, zips = NULL, jobname = "")
Arguments
- mapred,setup,combiner,cleanup,orderby,shared,jarfiles,zips,jobname
- arguments to
rhwatch
in RHIPE
Random Replicate Division
Description
Specify random replicate division parameters for data divisionUsage
rrDiv(nrows = NULL, seed = NULL)
Arguments
- nrows
- number of rows each subset should have
- seed
- the random seed to use (experimental)
Value
a list to be used for the "by" argument to divide
Details
The random replicate division method currently gets the total number of rows of the input data and divides it by nrows
to get the number of subsets. Then it randomly assigns each row of the input data to one of the subsets, resulting in subsets with approximately nrows
rows. A future implementation will make each subset have exactly nrows
rows.
References
- http://www.datadr.org
- Guha, S., Hafen, R., Rounds, J., Xia, J., Li, J., Xi, B., & Cleveland, W. S. (2012). Large complex data: divide and recombine (D&R) with RHIPE. Stat, 1(1), 53-67.
See also
divide
, recombine
, condDiv
Author
Ryan HafenSet up transformation environment
Description
This is called internally in the map phase of datadr MapReduce jobs. It is not meant for use outside of there, but is exported for convenience. Given an environment and collection of transformations, it populates the environment with the global variables in the transformations.Usage
setupTransformEnv(transFns, env = NULL)
Arguments
- transFns
- from the "transforms" attribute of a ddo object
- env
- the environment in which to evaluate the transformations
Specify Control Parameters for Spark Job
Description
Specify control parameters for a Spark job. Seerhwatch
for details about each of the parameters.
Usage
sparkControl()
Connect to Spark Data Source
Description
Connect to a Spark data source, potentially on HDFSUsage
sparkDataConn(loc, type = "object", hdfs = FALSE, init = list(), autoYes = FALSE, reset = FALSE, verbose = TRUE)
Arguments
- loc
- location on the file system (typically HDFS) for the data source
- type
- is it a "text" file or "object" (default) file?
- hdfs
- is the file on HDFS?
- init
- if a SparkContext has not been initialized with
sparkR.init
, a named list of arguments to be passed tosparkR.init
to initialize a SparkContext - autoYes
- automatically answer "yes" to questions about creating a path if missing
- reset
- should existing metadata for this object be overwritten?
- verbose
- logical - print messages about what is being done
Value
a "kvConnection" object of class "sparkDataConn"
Details
This simply creates a "connection" to a directory (which need not have data in it). To actually do things with this data, see ddo
, etc.
See also
addData
, ddo
, ddf
, sparkDataConn
Author
Ryan HafenUpdate Attributes of a 'ddo' or 'ddf' Object
Description
Update attributes of a 'ddo' or 'ddf' objectUsage
updateAttributes(obj, control = NULL)
Arguments
- obj
- an object of class 'ddo' or 'ddf'
- control
- parameters specifying how the backend should handle things (most-likely parameters to
rhwatch
in RHIPE) - seerhipeControl
Value
an object of class 'ddo' or 'ddf'
Details
This function looks for missing attributes related to a ddo or ddf (distributed data object or data frame) object and runs MapReduce to update them. These attributes include "splitSizeDistn", "keys", "nDiv", "nRow", and "splitRowDistn". These attributes are useful for subsequent computations that might rely on them. The result is the input modified to reflect the updated attributes, and thus it should be used as obj <- updateAttributes(obj)
.
References
Bennett, Janine, et al. "Numerically stable, single-pass, parallel statistics algorithms.' Cluster Computing and Workshops", 2009. CLUSTER09. IEEE International Conference on. IEEE, 2009
See also
ddo
, ddf
, divide