deltarho icon DeltaRho

datadr R function reference


Divide and Recombine for Large, Complex Data

Author: Ryan Hafen

Version: 0.7.5

Date: 2015-01-06

License: BSD_3_clause + file LICENSE

Description

Methods for dividing data into subsets, applying analytical methods to the subsets, and recombining the results. Comes with a generic MapReduce interface as well. Works with key-value pairs stored in memory, on local disk, or on HDFS, in the latter case using the R and Hadoop Integrated Programming Environment (RHIPE). Also includes experimental support for key-value pairs stored as Spark RDDs using the SparkR package.

Depends

R (>= 2.15.1)

Suggests

testthat (>= 0.8.1), roxygen2 (>= 4.0.0), Rhipe, SparkR

Add Key-Value Pairs to a Data Connection

Description

Add key-value pairs to a data connection

Usage

addData(conn, data, overwrite = FALSE)

Arguments

conn
a kvConnection object
data
a list of key-value pairs (list of lists where each sub-list has two elements, the key and the value)
overwrite
if data with the same key is already present in the data, should it be overwritten? (does not work for HDFS connections)

Note

This is generally not recommended for HDFS as it writes a new file each time it is called, and can result in more individual files than Hadoop likes to deal with.

See also

removeData, localDiskConn, hdfsConn

Author

Ryan Hafen

Add a Transformation Function to a Distributed Data Object

Description

Add a transformation function to be applied to each subset of a distributed data object

Usage

addTransform(obj, fn, name = NULL, params = NULL, packages = NULL)

Arguments

obj
a distributed data object
fn
a function to be applied to each subset of obj
name
optional name of the transformation
params
a named list of parameters external to obj that are needed in the transformation function (most should be taken care of automatically such that this is rarely necessary to specify)
packages
a vector of R package names that contain functions used in fn (most should be taken care of automatically such that this is rarely necessary to specify)

Details

When you add a transformation to a distributed data object, the transformation is not applied immediately, but is deferred until a function that kicks off a computation is done. These include divide, recombine, drJoin, drLapply, drFilter, drSample, drSubset. When any of these are invoked on an object with a transformation attached to it, the transformation will be applied in the map phase of computation prior to any other computation. The transformation will also be applied any time a subset of the data is requested. Thus although the data has not been physically transformed after a call of addTransform, we can think of it conceptually as already being transformed.

When addTransform is called, it is tested on a subset of the data to make sure we have all of the necessary global variables and packages loaded necessary to portably perform the transformation.

It is possible to add multiple transformations to a distributed data object, in which case they are applied in the order supplied, but only one transform should be necessary.

"Census Income" Dataset

Description

"Census Income" dataset from UCI machine learning repository

Usage

adult

Format

(From UCI machine learning repository)

  • age. continuous
  • workclass. Private, Self-emp-not-inc, Self-emp-inc, Federal-gov, Local-gov, State-gov, Without-pay, Never-worked
  • fnlwgt. continuous
  • education. Bachelors, Some-college, 11th, HS-grad, Prof-school, Assoc-acdm, Assoc-voc, 9th, 7th-8th, 12th, Masters, 1st-4th, 10th, Doctorate, 5th-6th, Preschool education-num: continuous
  • marital. Married-civ-spouse, Divorced, Never-married, Separated, Widowed, Married-spouse-absent, Married-AF-spouse
  • occupation. Tech-support, Craft-repair, Other-service, Sales, Exec-managerial, Prof-specialty, Handlers-cleaners, Machine-op-inspct, Adm-clerical, Farming-fishing, Transport-moving, Priv-house-serv, Protective-serv, Armed-Forces
  • relationship. Wife, Own-child, Husband, Not-in-family, Other-relative, Unmarried
  • race. White, Asian-Pac-Islander, Amer-Indian-Eskimo, Other, Black
  • sex. Female, Male
  • capgain. continuous
  • caploss. continuous
  • hoursperweek. continuous
  • nativecountry. United-States, Cambodia, England, Puerto-Rico, Canada, Germany, Outlying-US(Guam-USVI-etc), India, Japan, Greece, South, China, Cuba, Iran, Honduras, Philippines, Italy, Poland, Jamaica, Vietnam, Mexico, Portugal, Ireland, France, Dominican-Republic, Laos, Ecuador, Taiwan, Haiti, Columbia, Hungary, Guatemala, Nicaragua, Scotland, Thailand, Yugoslavia, El-Salvador, Trinadad&Tobago, Peru, Hong, Holand-Netherlands
  • income. <=50K, >50K
  • incomebin. 0 if income<=50K, 1 if income>50K

"Census Income" Dataset

References

Bache, K. & Lichman, M. (2013). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science.

Applies the transformation function(s)

Description

This is called internally in the map phase of datadr MapReduce jobs. It is not meant for use outside of there, but is exported for convenience.

Usage

applyTransform(transFns, x, env = NULL)

Arguments

transFns
from the "transforms" attribute of a ddo object
x
a subset of the object
env
the environment in which to evaluate the function (should be instantiated from calling setupTransformEnv) - if NULL, the environment will be set up for you

Turn 'ddf' Object into Data Frame

Description

Rbind all the rows of a 'ddf' object into a single data frame

Usage

"as.data.frame"(x, row.names = NULL, optional = FALSE, keys = TRUE, splitVars = TRUE, bsvs = FALSE, ...)

Arguments

x
a 'ddf' object
row.names
passed to as.data.frame
optional
passed to as.data.frame
keys
should the key be added as a variable in the resulting data frame? (if key is not a character, it will be replaced with a md5 hash)
splitVars
should the values of the splitVars be added as variables in the resulting data frame?
bsvs
should the values of bsvs be added as variables in the resulting data frame?
...
additional arguments passed to as.data.frame

Turn 'ddo' / 'ddf' Object into a list

Description

Turn 'ddo' / 'ddf' Object into a list

Usage

"as.list"(x, ...)

Arguments

x
a 'ddo' / 'ddf' object
...
additional arguments passed to as.list

Construct Between Subset Variable (BSV)

Description

Construct between subset variable (BSV)

Usage

bsv(val = NULL, desc = "")

Arguments

val
a scalar character, numeric, or date
desc
a character string describing the BSV

Details

Should be called inside the bsvFn argument to divide used for constructing a BSV list for each subset of a division.

See also

divide, getBsvs, bsvInfo

Author

Ryan Hafen

Character File Hash Function

Description

Function to be used to specify the file where key-value pairs get stored for local disk connections, useful when keys are scalar strings. Should be passed as the argument fileHashFn to localDiskConn.

Usage

charFileHash(keys, conn)

Arguments

keys
keys to be hashed
conn
a "localDiskConn" object

Details

You shouldn't need to call this directly other than to experiment with what the output looks like or to get ideas on how to write your own custom hash.

See also

localDiskConn, digestFileHash

Author

Ryan Hafen

"Collect" Recombination

Description

"Collect" recombination - simply collect the results into a local list of key-value pairs

Usage

combCollect(...)

Arguments

...
...

Details

This is an experimental prototype. It is to be passed as the argument combine to recombine.

See also

divide, recombine, combDdo, combMeanCoef, combRbind, combMean

Author

Ryan Hafen

"DDO" Recombination

Description

"DDO" recombination - simply collect the results into a "ddo" object

Usage

combDdo(...)

Arguments

...
...

Details

This is an experimental prototype. It is to be passed as the argument combine to recombine.

See also

divide, recombine, combCollect, combMeanCoef, combRbind, combMean

Author

Ryan Hafen

Mean Recombination

Description

Mean recombination

Usage

combMean(...)

Arguments

...
...

Details

This is an experimental prototype. It is to be passed as the argument combine to recombine.

See also

divide, recombine, combCollect, combDdo, combRbind, combMeanCoef

Author

Ryan Hafen

Mean Coefficient Recombination

Description

Mean coefficient recombination

Usage

combMeanCoef(...)

Arguments

...
...

Details

This is an experimental prototype. It is to be passed as the argument combine to recombine. It expects to be dealing with named vectors including an element n specifying the number of rows in that subset.

See also

divide, recombine, rrDiv, combCollect, combDdo, combRbind, combMean

Author

Ryan Hafen

"rbind" Recombination

Description

"rbind" recombination

Usage

combRbind(...)

Arguments

...
...

Details

This is an experimental prototype. It is to be passed as the argument combine to recombine.

See also

divide, recombine, combDdo, combCollect, combMeanCoef, combMean

Author

Ryan Hafen

Conditioning Variable Division

Description

Specify conditioning variable division parameters for data division

Usage

condDiv(vars)

Arguments

vars
a character string or vector of character strings specifying the variables of the input data across which to divide

Value

a list to be used for the "by" argument to divide

Details

Currently each unique combination of values of vars constitutes a subset. In the future, specifying shingles for numeric conditioning variables will be implemented.

References

See also

divide, getSplitVars, getSplitVar

Author

Ryan Hafen

Convert 'ddo' / 'ddf' Objects

Description

Convert 'ddo' / 'ddf' objects between different storage backends

Usage

convert(from, to)

Arguments

from
a 'ddo' or 'ddf' object
to
a 'kvConnection' object (created with localDiskConn or hdfsConn) or NULL if an in-memory 'ddo' / 'ddf' is desired

datadr.

Description

datadr.

'ddf' accessors

Description

'ddf' accessors,'ddf' accessors,'ddf' accessors,'ddf' accessors,The Number of Rows/Columns of a 'ddf' object

Usage

nrow(x)
NROW(x)
ncol(x)
NCOL(x)
"nrow"(x)
"NROW"(x)
"ncol"(x)
"NCOL"(x)

Arguments

x
a 'ddf' object

Instantiate a Distributed Data Frame ('ddf')

Description

Instantiate a distributed data frame ('ddf')

Usage

ddf(conn, transFn = NULL, update = FALSE, reset = FALSE, control = NULL, verbose = TRUE)

Arguments

conn
an object pointing to where data is or will be stored for the 'ddf' object - can be a 'kvConnection' object created from localDiskConn or hdfsConn, or a data frame or list of key-value pairs
transFn
transFn a function to be applied to the key-value pairs of this data prior to doing any processing, that transform the data into a data frame if it is not stored as such
update
should the attributes of this object be updated? See updateAttributes for more details.
reset
should all persistent metadata about this object be removed and the object created from scratch? This setting does not effect data stored in the connection location.
control
parameters specifying how the backend should handle things if attributes are updated (most-likely parameters to rhwatch in RHIPE) - see rhipeControl and localDiskControl
verbose
logical - print messages about what is being done

Accessor Functions

Description

Accessor functions for attributes of ddo/ddf objects. Methods also include nrow and ncol for ddf objects.,Accessor methods for 'ddo' and 'ddf' objects

Usage

kvExample(x)
bsvInfo(x)
counters(x)
splitSizeDistn(x)
splitRowDistn(x)
getKeys(x)
"summary"(object, ...)
"summary"(object, ...)
hasExtractableKV(x)
"names"(x)
"length"(x)

Arguments

x
a 'ddf'/'ddo' object
object
a 'ddf'/'ddo' object
...
additional arguments

Instantiate a Distributed Data Object ('ddo')

Description

Instantiate a distributed data object ('ddo')

Usage

ddo(conn, update = FALSE, reset = FALSE, control = NULL, verbose = TRUE)

Arguments

conn
an object pointing to where data is or will be stored for the 'ddf' object - can be a 'kvConnection' object created from localDiskConn or hdfsConn, or a data frame or list of key-value pairs
update
should the attributes of this object be updated? See updateAttributes for more details.
reset
should all persistent metadata about this object be removed and the object created from scratch? This setting does not effect data stored in the connection location.
control
parameters specifying how the backend should handle things if attributes are updated (most-likely parameters to rhwatch in RHIPE) - see rhipeControl and localDiskControl
verbose
logical - print messages about what is being done

Managing attributes of 'ddo' or 'ddf' objects

Description

Managing attributes of 'ddo' or 'ddf' objects

Usage

setAttributes(obj, attrs)
"setAttributes"(obj, attrs)
"setAttributes"(obj, attrs)
getAttribute(obj, attrName)
getAttributes(obj, attrNames)
"getAttributes"(obj, attrNames)
"getAttributes"(obj, attrNames)
hasAttributes(obj, ...)
"hasAttributes"(obj, attrNames)

Arguments

obj
'ddo' or 'ddf' object
attrs
a named list of attributes to set
attrName
name of the attribute to get
attrNames
vector of names of the attributes to get
...
additional arguments

Digest File Hash Function

Description

Function to be used to specify the file where key-value pairs get stored for local disk connections, useful when keys are arbitrary objects. File names are determined using a md5 hash of the object. This is the default argument for fileHashFn in localDiskConn.

Usage

digestFileHash(keys, conn)

Arguments

keys
keys to be hashed
conn
a "localDiskConn" object

Details

You shouldn't need to call this directly other than to experiment with what the output looks like or to get ideas on how to write your own custom hash.

See also

localDiskConn, charFileHash

Author

Ryan Hafen

Divide a Distributed Data Object

Description

Divide a ddo/ddf object into subsets based on different criteria

Usage

divide(data, by = NULL, spill = 1e+06, filterFn = NULL, bsvFn = NULL, output = NULL, overwrite = FALSE, preTransFn = NULL, postTransFn = NULL, params = NULL, packages = NULL, control = NULL, update = FALSE, verbose = TRUE)

Arguments

data
an object of class "ddf" or "ddo" - in the latter case, need to specify preTransFn to coerce each subset into a data frame
by
specification of how to divide the data - conditional (factor-level or shingles), random replicate, or near-exact replicate (to come) -- see details
spill
integer telling the division method how many lines of data should be collected until spilling over into a new key-value pair
filterFn
a function that is applied to each candidate output key-value pair to determine whether it should be (if returns TRUE) part of the resulting division
bsvFn
a function to be applied to each subset that returns a list of between subset variables (BSVs)
output
a "kvConnection" object indicating where the output data should reside (see localDiskConn, hdfsConn). If NULL (default), output will be an in-memory "ddo" object.
overwrite
logical; should existing output location be overwritten? (also can specify overwrite = "backup" to move the existing output to _bak)
preTransFn
a transformation function (if desired) to applied to each subset prior to division - note: this is deprecated - instead use addTransform prior to calling divide
postTransFn
a transformation function (if desired) to apply to each post-division subset
params
a named list of parameters external to the input data that are needed in the distributed computing (most should be taken care of automatically such that this is rarely necessary to specify)
packages
a vector of R package names that contain functions used in fn (most should be taken care of automatically such that this is rarely necessary to specify)
control
parameters specifying how the backend should handle things (most-likely parameters to rhwatch in RHIPE) - see rhipeControl and localDiskControl
update
should a MapReduce job be run to obtain additional attributes for the result data prior to returning?
verbose
logical - print messages about what is being done

Value

an object of class "ddf" if the resulting subsets are data frames. Otherwise, an object of class "ddo".

Details

The division methods this function will support include conditioning variable division for factors (implemented -- see condDiv), conditioning variable division for numerical variables through shingles, random replicate (implemented -- see rrDiv), and near-exact replicate. If by is a vector of variable names, the data will be divided by these variables. Alternatively, this can be specified by e.g. condDiv(c("var1", "var2")).

References

See also

recombine, ddo, ddf, condDiv, rrDiv

Author

Ryan Hafen

Functions used in divide()

Description

Functions used in divide()

Usage

dfSplit(curDF, by, seed)
addSplitAttrs(curSplit, bsvFn, by, postTransFn = NULL)

Arguments

curDF,seed
arguments
curSplit,bsvFn,by,postTransFn
arguments

Division-Agnostic Aggregation

Description

Aggregates data by cross-classifying factors, with a formula interface similar to xtabs

Usage

drAggregate(formula, data = data, by = NULL, output = NULL, preTransFn = NULL, maxUnique = NULL, params = NULL, packages = NULL, control = NULL)

Arguments

formula
a formula object with the cross-classifying variables (separated by +) on the right hand side (or an object which can be coerced to a formula). Interactions are not allowed. On the left hand side, one may optionally give a variable name in the data representing counts; in the latter case, the columns are interpreted as corresponding to the levels of a variable. This is useful if the data have already been tabulated.
data
a "ddf" containing the variables in the formula formula
by
an optional variable name or vector of variable names by which to split up tabulations (i.e. tabulate independently inside of each unique "by" variable value). The only difference between specifying "by" and placing the variable(s) in the right hand side of the formula is how the computation is done and how the result is returned.
output
"kvConnection" object indicating where the output data should reside in the case of by being specified (see localDiskConn, hdfsConn). If NULL (default), output will be an in-memory "ddo" object.
preTransFn
an optional function to apply to each subset prior to performing tabulation. The output from this function should be a data frame containing variables with names that match that of the formula provided. Note: this is deprecated - instead use addTransform prior to calling divide.
maxUnique
the maximum number of unique combinations of variables to obtain tabulations for. This is meant to help against cases where a variable in the formula has a very large number of levels, to the point that it is not meaningful to tabulate and is too computationally burdonsome. If NULL, it is ignored. If a positive number, only the top and bottom maxUnique tabulations by frequency are kept.
params
a named list of parameters external to the input data that are needed in the distributed computing (most should be taken care of automatically such that this is rarely necessary to specify)
packages
a vector of R package names that contain functions used in fn (most should be taken care of automatically such that this is rarely necessary to specify)
control
parameters specifying how the backend should handle things (most-likely parameters to rhwatch in RHIPE) - see rhipeControl and localDiskControl

Value

a data frame of the tabulations. When "by" is specified, it is a ddf with each key-value pair corresponding to a unique "by" value, containing a data frame of tabulations.

Note

The interface is similar to xtabs, but instead of returning a full contingency table, data is returned in the form of a data frame only with rows for which there were positive counts. This result is more similar to what is returned by aggregate.

See also

xtabs, updateAttributes

Author

Ryan Hafen

Bag of Little Bootstraps Transformation Method

Description

Bag of little bootstraps transformation method

Usage

drBLB(x, statistic, metric, R, n)

Arguments

x
a subset of a ddf
statistic
a function to apply to the subset specifying the statistic to compute. Must have arguments 'data' and 'weights' - see details). Must return a vector, where each element is a statistic of interest.
metric
a function specifying the metric to be applied to the R bootstrap samples of each statistic returned by statistic. Expects an input vector and should output a vector.
R
the number of bootstrap samples
n
the total number of observations in the data

Details

It is necessary to specify weights as a parameter to the statistic function because for BLB to work efficiently, it must resample each time with a sample of size n. To make this computationally possible for very large n, we can use weights (see reference for details). Therefore, only methods with a weights option can legitimately be used here.

References

BLB paper

See also

divide, recombine

Author

Ryan Hafen

Filter a 'ddo' or 'ddf' Object

Description

Filter a 'ddo' or 'ddf' object

Usage

drFilter(x, filterFn, output = NULL, overwrite = FALSE, params = NULL, packages = NULL, control = NULL)

Arguments

x
an object of class 'ddo' or 'ddf'
filterFn
function that takes the keys and/or values and returns either TRUE or FALSE - if TRUE, that key-value pair will be present in the result
output
a "kvConnection" object indicating where the output data should reside (see localDiskConn, hdfsConn). If NULL (default), output will be an in-memory "ddo" object.
overwrite
logical; should existing output location be overwritten? (also can specify overwrite = "backup" to move the existing output to _bak)
params
a named list of parameters external to the input data that are needed in the distributed computing (most should be taken care of automatically such that this is rarely necessary to specify)
packages
a vector of R package names that contain functions used in fn (most should be taken care of automatically such that this is rarely necessary to specify)
control
parameters specifying how the backend should handle things (most-likely parameters to rhwatch in RHIPE) - see rhipeControl and localDiskControl

Value

a 'ddo' or 'ddf' object

See also

drJoin, drLapply

Author

Ryan Hafen

Get Global Variables and Package Dependencies

Description

Get global variables and package dependencies for a function

Usage

drGetGlobals(f)

Arguments

f
function

Value

a list of variables (named by variable) and a vector of package names

Details

This traverses the parent environments of the supplied function and finds all global variables using findGlobals and retrieves their values. All package function calls are also found and a list of required packages is also returned.

Author

Ryan Hafen

GLM Transformation Method

Description

GLM transformation method

Usage

drGLM(...)

Arguments

...
arguments you would pass to the glm function

Details

This provides a transformation function to be called for each subset in a recombination MapReduce job that applies R's glm method and outputs the coefficients in a way that combMeanCoef knows how to deal with. It can be applied to a ddf with addTransform prior to calling recombine.

See also

divide, recombine, rrDiv

Author

Ryan Hafen

HexBin Aggregation for Distributed Data Frames

Description

Create "hexbin" object of hexagonally binned data for a distributed data frame. This computation is division agnostic - it does not matter how the data frame is split up.

Usage

drHexbin(data, xVar, yVar, xTransFn = identity, yTransFn = identity, xbins = 30, shape = 1, params = NULL, packages = NULL, control = NULL)

Arguments

data
a distributed data frame
xVar,yVar
names of the variables to use
xTransFn,yTransFn
a transformation function to apply to the x and y variables prior to binning
xbins
the number of bins partitioning the range of xbnds
shape
the shape = yheight/xwidth of the plotting regions
params
a named list of parameters external to the input data that are needed in the distributed computing (most should be taken care of automatically such that this is rarely necessary to specify)
packages
a vector of R package names that contain functions used in fn (most should be taken care of automatically such that this is rarely necessary to specify)
control
parameters specifying how the backend should handle things (most-likely parameters to rhwatch in RHIPE) - see rhipeControl and localDiskControl

Value

a "hexbin" object

References

Carr, D. B. et al. (1987) Scatterplot Matrix Techniques for Large N. JASA 83, 398, 424--436.

See also

drQuantile

Author

Ryan Hafen

Join Two Data Sources by Key

Description

Join two data sources by key

Usage

drJoin(..., output = NULL, overwrite = FALSE, postTransFn = NULL, params = NULL, packages = NULL, control = NULL)

Arguments

output
a "kvConnection" object indicating where the output data should reside (see localDiskConn, hdfsConn). If NULL (default), output will be an in-memory "ddo" object.
overwrite
logical; should existing output location be overwritten? (also can specify overwrite = "backup" to move the existing output to _bak)
postTransFn
an optional function to be applied to the each final key-value pair after joining
params
a named list of parameters external to the input data that are needed in the distributed computing (most should be taken care of automatically such that this is rarely necessary to specify)
packages
a vector of R package names that contain functions used in fn (most should be taken care of automatically such that this is rarely necessary to specify)
control
parameters specifying how the backend should handle things (most-likely parameters to rhwatch in RHIPE) - see rhipeControl and localDiskControl
...
named lists of input objects - assumed that all are of same type (all HDFS, all localDisk, all in-memory)

Value

a 'ddo' object stored in the output connection, where the values are named lists with names according to the names given to the input data objects, and values are the corresponding data

See also

drFilter, drLapply

Author

Ryan Hafen

Apply a function to all key-value pairs of a ddo/ddf object

Description

Apply a function to all key-value pairs of a ddo/ddf object and get a new ddo object back, unless a different combine strategy is specified.

Usage

drLapply(X, FUN, combine = combDdo(), output = NULL, overwrite = FALSE, params = NULL, packages = NULL, control = NULL, verbose = TRUE)

Arguments

X
an object of class "ddo" of "ddf"
FUN
a function to be applied to each subset
combine
optional method to combine the results
output
a "kvConnection" object indicating where the output data should reside (see localDiskConn, hdfsConn). If NULL (default), output will be an in-memory "ddo" object.
overwrite
logical; should existing output location be overwritten? (also can specify overwrite = "backup" to move the existing output to _bak)
params
a named list of parameters external to the input data that are needed in the distributed computing (most should be taken care of automatically such that this is rarely necessary to specify)
packages
a vector of R package names that contain functions used in fn (most should be taken care of automatically such that this is rarely necessary to specify)
control
parameters specifying how the backend should handle things (most-likely parameters to rhwatch in RHIPE) - see rhipeControl and localDiskControl
verbose
logical - print messages about what is being done

Value

depends on combine

See also

recombine, drFilter, drJoin, combDdo, combRbind

Author

Ryan Hafen

Sample Quantiles for 'ddf' Objects

Description

Compute sample quantiles for 'ddf' objects

Usage

drQuantile(x, var, by = NULL, probs = seq(0, 1, 0.005), preTransFn = NULL, varTransFn = identity, nBins = 10000, tails = 100, params = NULL, packages = NULL, control = NULL, ...)

Arguments

x
a 'ddf' object
var
the name of the variable to compute quantiles for
by
an optional variable name or vector of variable names by which to group quantile computations
probs
numeric vector of probabilities with values in [0-1]
preTransFn
a transformation function (if desired) to applied to each subset prior to computing quantiles (here it may be useful for adding a "by" variable that is not present) - note: this transformation should not modify var (use varTransFn for that) - also note: this is deprecated - instead use addTransform prior to calling divide
varTransFn
transformation to apply to variable prior to computing quantiles
nBins
how many bins should the range of the variable be split into?
tails
how many exact values at each tail should be retained?
params
a named list of parameters external to the input data that are needed in the distributed computing (most should be taken care of automatically such that this is rarely necessary to specify)
packages
a vector of R package names that contain functions used in fn (most should be taken care of automatically such that this is rarely necessary to specify)
control
parameters specifying how the backend should handle things (most-likely parameters to rhwatch in RHIPE) - see rhipeControl and localDiskControl
...
additional arguments

Value

data frame of quantiles q and their associated f-value fval. If by is specified, then also a variable group.

Details

This division-agnostic quantile calculation algorithm takes the range of the variable of interest and splits it into nBins bins, tabulates counts for those bins, and reconstructs a quantile approximation from them. nBins should not get too large, but larger nBins gives more accuracy. If tails is positive, the first and last tails ordered values are attached to the quantile estimate - this is useful for long-tailed distributions or distributions with outliers for which you would like more detail in the tails.

See also

updateAttributes

Author

Ryan Hafen

Data Input

Description

Reads a text file in table format and creates a distributed data frame from it, with cases corresponding to lines and variables to fields in the file.

Usage

drRead.table(file, header = FALSE, sep = "", quote = "\"'", dec = ".", skip = 0, fill = !blank.lines.skip, blank.lines.skip = TRUE, comment.char = "#", allowEscapes = FALSE, encoding = "unknown", autoColClasses = TRUE, rowsPerBlock = 50000, postTransFn = identity, output = NULL, overwrite = FALSE, params = NULL, packages = NULL, control = NULL, ...)
drRead.csv(file, header = TRUE, sep = ",", quote = "\"", dec = ".", fill = TRUE, comment.char = "", ...)
drRead.csv2(file, header = TRUE, sep = ";", quote = "\"", dec = ",", fill = TRUE, comment.char = "", ...)
drRead.delim(file, header = TRUE, sep = "\t", quote = "\"", dec = ".", fill = TRUE, comment.char = "", ...)
drRead.delim2(file, header = TRUE, sep = "\t", quote = "\"", dec = ",", fill = TRUE, comment.char = "", ...)

Arguments

file
input text file - can either be character string pointing to a file on local disk, or an hdfsConn object pointing to a text file on HDFS (see output argument below)
header
this and parameters other parameters below are passed to read.table for each chunk being processed - see read.table for more info. Most all have defaults or appropriate defaults are set through other format-specific functions such as drRead.csv and drRead.delim.
sep
see read.table for more info
quote
see read.table for more info
dec
see read.table for more info
skip
see read.table for more info
fill
see read.table for more info
blank.lines.skip
see read.table for more info
comment.char
see read.table for more info
allowEscapes
see read.table for more info
encoding
see read.table for more info
autoColClasses
should column classes be determined automatically by reading in a sample? This can sometimes be problematic because of strange ways R handles quotes in read.table, but keeping the default of TRUE is advantageous for speed.
rowsPerBlock
how many rows of the input file should make up a block (key-value pair) of output?
postTransFn
a function to be applied after a block is read in to provide any additional processingn before the block is stored
output
a "kvConnection" object indicating where the output data should reside. Must be a localDiskConn object if input is a text file on local disk, or a hdfsConn object if input is a text file on HDFS.
overwrite
logical; should existing output location be overwritten? (also can specify overwrite = "backup" to move the existing output to _bak)
params
a named list of parameters external to the input data that are needed in postTransFn
packages
a vector of R package names that contain functions used in fn (most should be taken care of automatically such that this is rarely necessary to specify)
control
parameters specifying how the backend should handle things (most-likely parameters to rhwatch in RHIPE) - see rhipeControl and localDiskControl
...
see read.table for more info

Value

an object of class "ddf"

Note

For local disk, the file is actually read in sequentially instead of in parallel. This is because of possible performance issues when trying to read from the same disk in parallel.

Note that if skip is positive and/or if header is TRUE, it will first read these in as they only occur once in the data, and we then check for these lines in each block and remove those lines if they appear.

Also note that if you supply "Factor" column classes, they will be converted to character.

Author

Ryan Hafen

Take a Sample of Key-Value Pairs Take a sample of key-value Pairs

Description

Take a Sample of Key-Value Pairs Take a sample of key-value Pairs

Usage

drSample(x, fraction, output = NULL, overwrite = FALSE, control = NULL)

Arguments

x
a 'ddo' or 'ddf' object
fraction
fraction of key-value pairs to keep (between 0 and 1)
output
a "kvConnection" object indicating where the output data should reside (see localDiskConn, hdfsConn). If NULL (default), output will be an in-memory "ddo" object.
overwrite
logical; should existing output location be overwritten? (also can specify overwrite = "backup" to move the existing output to _bak)
control
parameters specifying how the backend should handle things (most-likely parameters to rhwatch in RHIPE) - see rhipeControl and localDiskControl

Subsetting Distributed Data Frames

Description

Return a subset of a "ddf" object to memory

Usage

drSubset(data, subset = NULL, select = NULL, drop = FALSE, preTransFn = NULL, maxRows = 5e+05, params = NULL, packages = NULL, control = NULL, verbose = TRUE)

Arguments

data
object to be subsetted -- an object of class "ddf" or "ddo" - in the latter case, need to specify preTransFn to coerce each subset into a data frame
subset
logical expression indicating elements or rows to keep: missing values are taken as false
select
expression, indicating columns to select from a data frame
drop
passed on to [ indexing operator
preTransFn
a transformation function (if desired) to applied to each subset prior to division - note: this is deprecated - instead use addTransform prior to calling divide
maxRows
the maximum number of rows to return
params
a named list of parameters external to the input data that are needed in the distributed computing (most should be taken care of automatically such that this is rarely necessary to specify)
packages
a vector of R package names that contain functions used in fn (most should be taken care of automatically such that this is rarely necessary to specify)
control
parameters specifying how the backend should handle things (most-likely parameters to rhwatch in RHIPE) - see rhipeControl and localDiskControl
verbose
logical - print messages about what is being done

Value

data frame

Author

Ryan Hafen

"Flatten" a ddf Subset

Description

Add split variables and BSVs (if any) as columns to a subset of a ddf.

Usage

flatten(x)

Arguments

x
a value of a key-value pair

See also

getSplitVars, getBsvs

Get Between Subset Variable

Description

For a given key-value pair, get a BSV variable value by name (if present)

Usage

getBsv(x, name)

Arguments

x
a key-value pair or a value
name
the name of the BSV to get

Get Between Subset Variables

Description

For a given key-value pair, exract all BSVs

Usage

getBsvs(x)

Arguments

x
a key-value pair or a value

Get names of the conditioning variable cuts

Description

Used internally for exported for certain reasons. Do not use explicitly.

Usage

getCondCuts(df, splitVars)

Arguments

df
a data frame
splitVars
a vector of variable names to split by

Extract "Split" Variable

Description

For a given key-value pair or value, get a split variable value by name, if present (split variables are variables that define how the data was divided).

Usage

getSplitVar(x, name)

Arguments

x
a key-value pair or a value
name
the name of the split variable to get

Extract "Split" Variables

Description

For a given k/v pair or value, exract all split variables (split variables are variables that define how the data was divided).

Usage

getSplitVars(x)

Arguments

x
a key-value pair or a value

Connect to Data Source on HDFS

Description

Connect to a data source on HDFS

Usage

hdfsConn(loc, type = "sequence", autoYes = FALSE, reset = FALSE, verbose = TRUE)

Arguments

loc
location on HDFS for the data source
type
the type of data ("map", "sequence", "text")
autoYes
automatically answer "yes" to questions about creating a path on HDFS
reset
should existing metadata for this object be overwritten?
verbose
logical - print messages about what is being done

Value

a "kvConnection" object of class "hdfsConn"

Details

This simply creates a "connection" to a directory on HDFS (which need not have data in it). To actually do things with this data, see ddo, etc.

See also

addData, ddo, ddf, localDiskConn

Author

Ryan Hafen

Apply Function to Key-Value Pair

Description

Apply a function to a single key-value pair - not a traditional R "apply" function.

Usage

kvApply(fn, kvPair, returnKV = FALSE)

Arguments

fn
a function
kvPair
a key-value pair (a list with 2 elements)
returnKV
should the key and value be returned?

Details

Determines how a function should be applied to a key-value pair and then applies it: if the function has two formals, it applies the function giving it the key and the value as the arguments; if the function has one formal, it applies the function giving it just the value. This provides flexibility and simplicity for when a function is only meant to be applied to the value, but still allows keys to be used if desired.

Manually Specify Key-Value Pairs

Description

Manually specify key-value pairs

Usage

kvPairs(...)

Arguments

...
key-value pairs (lists with two elements)

Value

a list of objects of class "kvPair"

Author

Ryan Hafen

Connect to Data Source on Local Disk

Description

Connect to a data source on local disk

Usage

localDiskConn(loc, nBins = 0, fileHashFn = NULL, autoYes = FALSE, reset = FALSE, verbose = TRUE)

Arguments

loc
location on local disk for the data source
nBins
number of bins (subdirectories) to put data files into - if anticipating a large number of k/v pairs, it is a good idea to set this to something bigger than 0
fileHashFn
an optional function that operates on each key-value pair to determine the subdirectory structure for where the data should be stored for that subset, or can be specified "asis" when keys are scalar strings
autoYes
automatically answer "yes" to questions about creating a path on local disk
reset
should existing metadata for this object be overwritten?
verbose
logical - print messages about what is being done

Value

a "kvConnection" object of class "localDiskConn"

Details

This simply creates a "connection" to a directory on local disk (which need not have data in it). To actually do things with this connection, see ddo, etc. Typically, you should just use loc to specify where the data is or where you would like data for this connection to be stored. Metadata for the object is also stored in this directory.

See also

addData, ddo, ddf, localDiskConn

Author

Ryan Hafen

Specify Control Parameters for MapReduce on a Local Disk Connection

Description

Specify control parameters for a MapReduce on a local disk connection. Currently the parameters include:

Usage

localDiskControl(cluster = NULL, map_buff_size_bytes = 10485760, reduce_buff_size_bytes = 10485760, map_temp_buff_size_bytes = 10485760)

Arguments

cluster
a "cluster" object obtained from makeCluster to allow for parallel processing
map_buff_size_bytes
determines how much data should be sent to each map task
reduce_buff_size_bytes
determines how much data should be sent to each reduce task
map_temp_buff_size_bytes
determines the size of chunks written to disk in between the map and reduce

Note

If you have data on a shared drive that multiple nodes can access or a high performance shared file system like Lustre, you can run a local disk MapReduce job on multiple nodes by creating a multi-node cluster with makeCluster.

If you are using multiple cores and the input data is very small, map_buff_size_bytes needs to be small so that the key-value pairs will be split across cores.

Take a ddo/ddf HDFS data object and turn it into a mapfile

Description

Take a ddo/ddf HDFS data object and turn it into a mapfile

Usage

makeExtractable(obj)

Arguments

obj
object of class 'ddo' or 'ddf' with an HDFS connection

Execute a MapReduce Job

Description

Execute a MapReduce job

Usage

mrExec(data, setup = NULL, map = NULL, reduce = NULL, output = NULL, overwrite = FALSE, control = NULL, params = NULL, packages = NULL, verbose = TRUE)

Arguments

data
a ddo/ddf object, or list of ddo/ddf objects
setup
an expression of R code (created using the R command expression) to be run before map and reduce
map
an R expression that is evaluated during the map stage. For each task, this expression is executed multiple times (see details).
reduce
a vector of R expressions with names pre, reduce, and post that is evaluated during the reduce stage. For example reduce = expression(pre = {...}, reduce = {...}, post = {...}). reduce is optional, and if not specified the map output key-value pairs will be the result. If it is not specified, then a default identity reduce is performed. Setting it to 0 will skip the reduce altogether.
output
a "kvConnection" object indicating where the output data should reside (see localDiskConn, hdfsConn). If NULL (default), output will be an in-memory "ddo" object.
overwrite
logical; should existing output location be overwritten? (also can specify overwrite = "backup" to move the existing output to _bak)
control
parameters specifying how the backend should handle things (most-likely parameters to rhwatch in RHIPE) - see rhipeControl and localDiskControl
params
a named list of parameters external to the input data that are needed in the map or reduce phases
packages
a vector of R package names that contain functions used in fn (most should be taken care of automatically such that this is rarely necessary to specify)
verbose
logical - print messages about what is being done

Value

"ddo" object - to keep it simple. It is up to the user to update or cast as "ddf" if that is the desired result.

Author

Ryan Hafen

Functions to Compute Summary Statistics in MapReduce

Description

Functions that are used to tabulate categorical variables and compute moments for numeric variables inside through the MapReduce framework. Used in updateAttributes.

Usage

tabulateMap(formula, data)
tabulateReduce(result, reduce.values, maxUnique = NULL)
calculateMoments(y, order = 1, na.rm = TRUE)
combineMoments(m1, m2)
combineMultipleMoments(...)
moments2statistics(m)

Arguments

formula
a formula to be used in xtabs
data
a subset of a 'ddf' object
result,reduce.values
inconsequential tabulateReduce parameters
maxUnique
the maximum number of unique combinations of variables to obtaion tabulations for. This is meant to help against cases where a variable in the formula has a very large number of levels, to the point that it is not meaningful to tabulate and is too computationally burdonsome. If NULL, it is ignored. If a positive number, only the top and bottom maxUnique tabulations by frequency are kept.
y,order,na.rm
inconsequential calculateMoments parameters
m1,m2
inconsequential combineMoments parameters
m
inconsequential moments2statistics parameters
...
inconsequential parameters

Print a "ddo" or "ddf" Object

Description

Print an overview of attributes of distributed data objects (ddo) or distributed data frames (ddf)

Usage

"print"(x, ...)

Arguments

x
object to be printed
...
additional arguments

Author

Ryan Hafen

Print a key-value pair

Description

Print a key-value pair

Usage

"print"(x, ...)

Arguments

x
object to be printed
...
additional arguments

Print value of a key-value pair

Description

Print value of a key-value pair

Usage

"print"(x, ...)

Arguments

x
object to be printed
...
additional arguments

Experimental HDFS text reader helper function

Description

Experimental helper function for reading text data on HDFS into a HDFS connection

Usage

readHDFStextFile(input, output = NULL, overwrite = FALSE, fn = NULL, keyFn = NULL, linesPerBlock = 10000, control = NULL, update = FALSE)

Arguments

input
a RHIPE input text handle created with rhfmt
output
an output connection such as those created with localDiskConn, and hdfsConn
overwrite
logical; should existing output location be overwritten? (also can specify overwrite = "backup" to move the existing output to _bak)
fn
function to be applied to each chunk of lines (input to function is a vector of strings)
keyFn
optional function to determine the value of the key for each block
linesPerBlock
how many lines at a time to read
control
parameters specifying how the backend should handle things (most-likely parameters to rhwatch in RHIPE) - see rhipeControl and localDiskControl
update
should a MapReduce job be run to obtain additional attributes for the result data prior to returning?

Experimental sequential text reader helper function

Description

Experimental helper function for reading text data sequentially from a file on disk and adding to connection using addData

Usage

readTextFileByChunk(input, output, overwrite = FALSE, linesPerBlock = 10000, fn = NULL, header = TRUE, skip = 0, recordEndRegex = NULL, cl = NULL)

Arguments

input
the path to an input text file
output
an output connection such as those created with localDiskConn, and hdfsConn
overwrite
logical; should existing output location be overwritten? (also can specify overwrite = "backup" to move the existing output to _bak)
linesPerBlock
how many lines at a time to read
fn
function to be applied to each chunk of lines (see details)
header
does the file have a header
skip
number of lines to skip before reading
recordEndRegex
an optional regular expression that finds lines in the text file that indicate the end of a record (for multi-line records)
cl
a "cluster" object to be used for parallel processing, created using makeCluster

Details

The function fn should have one argument, which should expect to receive a vector of strings, each element of which is a line in the file. It is also possible for fn to take two arguments, in which case the second argument is the header line from the file (some parsing methods might need to know the header).

Recombine

Description

Apply an analytic recombination method to a ddo/ddf object and combine the results

Usage

recombine(data, combine = NULL, apply = NULL, output = NULL, overwrite = FALSE, params = NULL, packages = NULL, control = NULL, verbose = TRUE)

Arguments

data
an object of class "ddo" of "ddf"
combine
the method to combine the results
apply
a function specifying the analytic method to apply to each subset, or a pre-defined apply function (see drBLB, drGLM, for example)
output
a "kvConnection" object indicating where the output data should reside (see localDiskConn, hdfsConn). If NULL (default), output will be an in-memory "ddo" object.
overwrite
logical; should existing output location be overwritten? (also can specify overwrite = "backup" to move the existing output to _bak)
params
a named list of parameters external to the input data that are needed in the distributed computing (most should be taken care of automatically such that this is rarely necessary to specify)
packages
a vector of R package names that contain functions used in fn (most should be taken care of automatically such that this is rarely necessary to specify)
control
parameters specifying how the backend should handle things (most-likely parameters to rhwatch in RHIPE) - see rhipeControl and localDiskControl
verbose
logical - print messages about what is being done

Value

depends on combine

References

See also

divide, ddo, ddf, drGLM, drBLB, combMeanCoef, combMean, combCollect, combRbind, drLapply

Author

Ryan Hafen

Remove Key-Value Pairs from a Data Connection

Description

Remove key-value pairs from a data connection

Usage

removeData(conn, keys)

Arguments

conn
a kvConnection object
keys
a list of keys indicating which k/v pairs to remove

Note

This is generally not recommended for HDFS as it writes a new file each time it is called, and can result in more individual files than Hadoop likes to deal with.

See also

removeData, localDiskConn, hdfsConn

Author

Ryan Hafen

Specify Control Parameters for RHIPE Job

Description

Specify control parameters for a RHIPE job. See rhwatch for details about each of the parameters.

Usage

rhipeControl(mapred = NULL, setup = NULL, combiner = FALSE, cleanup = NULL, orderby = "bytes", shared = NULL, jarfiles = NULL, zips = NULL, jobname = "")

Arguments

mapred,setup,combiner,cleanup,orderby,shared,jarfiles,zips,jobname
arguments to rhwatch in RHIPE

Random Replicate Division

Description

Specify random replicate division parameters for data division

Usage

rrDiv(nrows = NULL, seed = NULL)

Arguments

nrows
number of rows each subset should have
seed
the random seed to use (experimental)

Value

a list to be used for the "by" argument to divide

Details

The random replicate division method currently gets the total number of rows of the input data and divides it by nrows to get the number of subsets. Then it randomly assigns each row of the input data to one of the subsets, resulting in subsets with approximately nrows rows. A future implementation will make each subset have exactly nrows rows.

References

See also

divide, recombine, condDiv

Author

Ryan Hafen

Set up transformation environment

Description

This is called internally in the map phase of datadr MapReduce jobs. It is not meant for use outside of there, but is exported for convenience. Given an environment and collection of transformations, it populates the environment with the global variables in the transformations.

Usage

setupTransformEnv(transFns, env = NULL)

Arguments

transFns
from the "transforms" attribute of a ddo object
env
the environment in which to evaluate the transformations

Specify Control Parameters for Spark Job

Description

Specify control parameters for a Spark job. See rhwatch for details about each of the parameters.

Usage

sparkControl()

Connect to Spark Data Source

Description

Connect to a Spark data source, potentially on HDFS

Usage

sparkDataConn(loc, type = "object", hdfs = FALSE, init = list(), autoYes = FALSE, reset = FALSE, verbose = TRUE)

Arguments

loc
location on the file system (typically HDFS) for the data source
type
is it a "text" file or "object" (default) file?
hdfs
is the file on HDFS?
init
if a SparkContext has not been initialized with sparkR.init, a named list of arguments to be passed to sparkR.init to initialize a SparkContext
autoYes
automatically answer "yes" to questions about creating a path if missing
reset
should existing metadata for this object be overwritten?
verbose
logical - print messages about what is being done

Value

a "kvConnection" object of class "sparkDataConn"

Details

This simply creates a "connection" to a directory (which need not have data in it). To actually do things with this data, see ddo, etc.

See also

addData, ddo, ddf, sparkDataConn

Author

Ryan Hafen

Update Attributes of a 'ddo' or 'ddf' Object

Description

Update attributes of a 'ddo' or 'ddf' object

Usage

updateAttributes(obj, control = NULL)

Arguments

obj
an object of class 'ddo' or 'ddf'
control
parameters specifying how the backend should handle things (most-likely parameters to rhwatch in RHIPE) - see rhipeControl

Value

an object of class 'ddo' or 'ddf'

Details

This function looks for missing attributes related to a ddo or ddf (distributed data object or data frame) object and runs MapReduce to update them. These attributes include "splitSizeDistn", "keys", "nDiv", "nRow", and "splitRowDistn". These attributes are useful for subsequent computations that might rely on them. The result is the input modified to reflect the updated attributes, and thus it should be used as obj <- updateAttributes(obj).

References

Bennett, Janine, et al. "Numerically stable, single-pass, parallel statistics algorithms.' Cluster Computing and Workshops", 2009. CLUSTER09. IEEE International Conference on. IEEE, 2009

See also

ddo, ddf, divide

Author

Ryan Hafen