Authors: Ryan Hafen [aut, cre],Landon Sego [ctb]
Version: 0.8.5
License: BSD_3_clause + file LICENSE
Methods for dividing data into subsets, applying analytical methods to the subsets, and recombining the results. Comes with a generic MapReduce interface as well. Works with key-value pairs stored in memory, on local disk, or on HDFS, in the latter case using the R and Hadoop Integrated Programming Environment (RHIPE).
(none)
data.table (>= 1.9.6), digest, codetools, hexbin, parallel, magrittr, dplyr, methods
testthat (>= 0.11.0), roxygen2 (>= 5.0.1), Rhipe
(none)
Specify a key-value pair
kvPair(k, v)
kvPair("name", "bob")
Specify a collection of key-value pairs
kvPairs(...)
kvPairs(kvPair(1, letters), kvPair(2, rnorm(10)))
printkvPair(x, ...)
kvPair(1, letters)
printkvValue(x, ...)
kvPair(1, letters)
Apply a function to a single key-value pair - not a traditional R “apply” function.
kvApply(kvPair, fn)
kvPair
)
Determines how a function should be applied to a key-value pair and then applies it: if the function has two formals, it applies the function giving it the key and the value as the arguments; if the function has one formal, it applies the function giving it just the value. The function is assumed to return a value unless the result is a kvPair
object. When the function returns a value the original key will be returned in the resulting key-value pair.
This provides flexibility and simplicity for when a function is only meant to be applied to the value (the most common case), but still allows keys to be used if desired.
kv <- kvPair(1, 2)
kv
kvApply(kv, function(x) x^2)
kvApply(kv, function(k, v) v^2)
kvApply(kv, function(k, v) k + v)
kvApply(kv, function(x) kvPair("new_key", x))
Instantiate a distributed data object (‘ddo’)
ddo(conn, update = FALSE, reset = FALSE, control = NULL, verbose = TRUE)
localDiskConn
or hdfsConn
, or a data frame or list of key-value pairs
updateAttributes
for more details.
rhwatch
in RHIPE) - see rhipeControl
and localDiskControl
kv <- kvPairs(kvPair(1, letters), kvPair(2, rnorm(100)))
kvddo <- ddo(kv)
kvddo
Instantiate a distributed data frame (‘ddf’)
ddf(conn, transFn = NULL, update = FALSE, reset = FALSE, control = NULL,
verbose = TRUE)
localDiskConn
or hdfsConn
, or a data frame or list of key-value pairs
updateAttributes
for more details.
rhwatch
in RHIPE) - see rhipeControl
and localDiskControl
# in-memory ddf
d <- ddf(iris)
d
# local disk ddf
conn <- localDiskConn(tempfile(), autoYes = TRUE)
addData(conn, list(list("1", iris[1:10,])))
addData(conn, list(list("2", iris[11:110,])))
addData(conn, list(list("3", iris[111:150,])))
dl <- ddf(conn)
dl
# hdfs ddf (requires RHIPE / Hadoop)
# connect to empty HDFS directory
conn <- hdfsConn("/tmp/irisSplit")
# add some data
addData(conn, list(list("1", iris[1:10,])))
addData(conn, list(list("2", iris[11:110,])))
addData(conn, list(list("3", iris[111:150,])))
# represent it as a distributed data frame
hdd <- ddf(conn)
Update attributes of a ‘ddo’ or ‘ddf’ object
updateAttributes(obj, control = NULL)
rhwatch
in RHIPE) - see rhipeControl
This function looks for missing attributes related to a ddo or ddf (distributed data object or data frame) object and runs MapReduce to update them. These attributes include “splitSizeDistn”, “keys”, “nDiv”, “nRow”, and “splitRowDistn”. These attributes are useful for subsequent computations that might rely on them. The result is the input modified to reflect the updated attributes, and thus it should be used as obj <- updateAttributes(obj)
.
Bennett, Janine, et al. “Numerically stable, single-pass, parallel statistics algorithms. Cluster Computing and Workshops”, 2009. CLUSTER09. IEEE International Conference on. IEEE, 2009
d <- divide(iris, by = "Species")
# some attributes are missing:
d
summary(d)
d <- updateAttributes(d)
# now all attributes are available:
d
summary(d)
Ryan Hafen
splitRowDistn(x)
summaryddo(object, ...)
summaryddf(object, ...)
nrow(x)
NROW(x)
ncol(x)
NCOL(x)
nrowddf(x)
NROWddf(x)
ncolddf(x)
NCOLddf(x)
namesddf(x)
d <- divide(iris, by = "Species", update = TRUE)
nrow(d)
ncol(d)
length(d)
names(d)
summary(d)
getKeys(d)
Accessor functions for attributes of ddo/ddf objects. Methods also include nrow
and ncol
for ddf objects.
kvExample(x)
bsvInfo(x)
counters(x)
splitSizeDistn(x)
getKeys(x)
hasExtractableKV(x)
lengthddo(x)
d <- divide(iris, by = "Species", update = TRUE)
nrow(d)
ncol(d)
length(d)
names(d)
summary(d)
getKeys(d)
These are called internally in various datadr functions. They are not meant for use outside of there, but are exported for convenience, and can be useful for better understanding ddo/ddf objects.
setAttributes(obj, attrs)
setAttributesddf(obj, attrs)
setAttributesddo(obj, attrs)
getAttribute(obj, attrName)
getAttributes(obj, attrNames)
getAttributesddf(obj, attrNames)
getAttributesddo(obj, attrNames)
hasAttributes(obj, ...)
hasAttributesddf(obj, attrNames)
d <- divide(iris, by = "Species")
getAttribute(d, "keys")
Print an overview of attributes of distributed data objects (ddo) or distributed data frames (ddf)
printddo(x, ...)
kv <- kvPairs(kvPair(1, letters), kvPair(2, rnorm(100)))
kvddo <- ddo(kv)
kvddo
Ryan Hafen
Connect to a data source on local disk
localDiskConn(loc, nBins = 0, fileHashFn = NULL, autoYes = FALSE,
reset = FALSE, verbose = TRUE)
This simply creates a “connection” to a directory on local disk (which need not have data in it). To actually do things with this connection, see ddo
, etc. Typically, you should just use loc
to specify where the data is or where you would like data for this connection to be stored. Metadata for the object is also stored in this directory.
# connect to empty localDisk directory
conn <- localDiskConn(file.path(tempdir(), "irisSplit"), autoYes = TRUE)
# add some data
addData(conn, list(list("1", iris[1:10,])))
addData(conn, list(list("2", iris[11:110,])))
addData(conn, list(list("3", iris[111:150,])))
# represent it as a distributed data frame
irisDdf <- ddf(conn, update = TRUE)
irisDdf
addData
, ddo
, ddf
, localDiskConn
Ryan Hafen
Function to be used to specify the file where key-value pairs get stored for local disk connections, useful when keys are arbitrary objects. File names are determined using a md5 hash of the object. This is the default argument for fileHashFn
in localDiskConn
.
digestFileHash(keys, conn)
You shouldnt need to call this directly other than to experiment with what the output looks like or to get ideas on how to write your own custom hash.
# connect to empty localDisk directory
path <- file.path(tempdir(), "irisSplit")
unlink(path, recursive = TRUE)
conn <- localDiskConn(path, autoYes = TRUE, fileHashFn = digestFileHash)
# add some data
addData(conn, list(list("key1", iris[1:10,])))
addData(conn, list(list("key2", iris[11:110,])))
addData(conn, list(list("key3", iris[111:150,])))
# see that files were stored by their key
list.files(path)
Ryan Hafen
Function to be used to specify the file where key-value pairs get stored for local disk connections, useful when keys are scalar strings. Should be passed as the argument fileHashFn
to localDiskConn
.
charFileHash(keys, conn)
You shouldnt need to call this directly other than to experiment with what the output looks like or to get ideas on how to write your own custom hash.
# connect to empty localDisk directory
path <- file.path(tempdir(), "irisSplit")
unlink(path, recursive = TRUE)
conn <- localDiskConn(path, autoYes = TRUE, fileHashFn = charFileHash)
# add some data
addData(conn, list(list("key1", iris[1:10,])))
addData(conn, list(list("key2", iris[11:110,])))
addData(conn, list(list("key3", iris[111:150,])))
# see that files were stored by their key
list.files(path)
Ryan Hafen
Specify control parameters for a MapReduce on a local disk connection. Currently the parameters include:
localDiskControl(cluster = NULL, map_buff_size_bytes = 10485760,
reduce_buff_size_bytes = 10485760, map_temp_buff_size_bytes = 10485760)
makeCluster
to allow for parallel processing
If you have data on a shared drive that multiple nodes can access or a high performance shared file system like Lustre, you can run a local disk MapReduce job on multiple nodes by creating a multi-node cluster with makeCluster
.
If you are using multiple cores and the input data is very small, map_buff_size_bytes
needs to be small so that the key-value pairs will be split across cores.
# create a 2-node cluster that can be used to process in parallel
cl <- parallel::makeCluster(2)
# create a local disk control object that specifies to use this cluster
# these operations run in parallel
control <- localDiskControl(cluster = cl)
# note that setting options(defaultLocalDiskControl = control)
# will cause this to be used by default in all local disk operations
# convert in-memory ddf to local-disk ddf
ldPath <- file.path(tempdir(), "by_species")
ldConn <- localDiskConn(ldPath, autoYes = TRUE)
bySpeciesLD <- convert(divide(iris, by = "Species"), ldConn)
# update attributes using parallel cluster
updateAttributes(bySpeciesLD, control = control)
# remove temporary directories
unlink(ldPath, recursive = TRUE)
# shut down the cluster
parallel::stopCluster(cl)
Connect to a data source on HDFS
hdfsConn(loc, type = "sequence", autoYes = FALSE, reset = FALSE,
verbose = TRUE)
This simply creates a “connection” to a directory on HDFS (which need not have data in it). To actually do things with this data, see ddo
, etc.
# connect to empty HDFS directory
conn <- hdfsConn("/test/irisSplit")
# add some data
addData(conn, list(list("1", iris[1:10,])))
addData(conn, list(list("2", iris[11:110,])))
addData(conn, list(list("3", iris[111:150,])))
# represent it as a distributed data frame
hdd <- ddf(conn)
addData
, ddo
, ddf
, localDiskConn
Ryan Hafen
Specify control parameters for a RHIPE job. See rhwatch
for details about each of the parameters.
rhipeControl(mapred = NULL, setup = NULL, combiner = FALSE,
cleanup = NULL, orderby = "bytes", shared = NULL, jarfiles = NULL,
zips = NULL, jobname = "")
rhwatch
in RHIPE
# input data on HDFS
d <- ddf(hdfsConn("/path/to/big/data/on/hdfs"))
# set RHIPE / Hadoop parameters
# buffer sizes control how many k/v pairs are sent to map / reduce tasks at a time
# mapred.reduce.tasks is a Hadoop config parameter that controls # of reduce tasks
rhctl <- rhipeControl(mapred = list(
rhipe_map_buff_size = 10000,
mapred.reduce.tasks = 72,
rhipe_reduce_buff_size = 1)
# divide input data using these control parameters
divide(d, by = "var", output = hdfsConn("/path/to/output"), control = rhctl)
Add key-value pairs to a data connection
addData(conn, data, overwrite = FALSE)
This is generally not recommended for HDFS as it writes a new file each time it is called, and can result in more individual files than Hadoop likes to deal with.
# connect to empty HDFS directory
conn <- hdfsConn("/test/irisSplit")
# add some data
addData(conn, list(list("1", iris[1:10,])))
addData(conn, list(list("2", iris[11:110,])))
addData(conn, list(list("3", iris[111:150,])))
# represent it as a distributed data frame
hdd <- ddf(conn)
removeData
, localDiskConn
, hdfsConn
Ryan Hafen
Remove key-value pairs from a data connection
removeData(conn, keys)
This is generally not recommended for HDFS as it writes a new file each time it is called, and can result in more individual files than Hadoop likes to deal with.
# connect to empty localDisk directory
conn <- localDiskConn(file.path(tempdir(), "irisSplit"), autoYes = TRUE)
# add some data
addData(conn, list(list("1", iris[1:10,])))
addData(conn, list(list("2", iris[11:90,])))
addData(conn, list(list("3", iris[91:110,])))
addData(conn, list(list("4", iris[111:150,])))
# represent it as a distributed data frame
irisDdf <- ddf(conn, update = TRUE)
irisDdf
# remove data for keys "1" and "2"
removeData(conn, list("1", "2"))
# look at result with updated attributes (reset = TRUE removes previous attrs)
irisDdf <- ddf(conn, reset = TRUE, update = TRUE)
irisDdf
removeData
, localDiskConn
, hdfsConn
Ryan Hafen
Reads a text file in table format and creates a distributed data frame from it, with cases corresponding to lines and variables to fields in the file.
drReadtable(file, header = FALSE, sep = "", quote = "\"'", dec = ".",
skip = 0, fill = !blank.lines.skip, blank.lines.skip = TRUE, comment.char = "#",
allowEscapes = FALSE, encoding = "unknown", autoColClasses = TRUE,
rowsPerBlock = 50000, postTransFn = identity, output = NULL, overwrite = FALSE,
params = NULL, packages = NULL, control = NULL, ...)
drReadcsv(file, header = TRUE, sep = ",",
quote = "\"", dec = ".", fill = TRUE, comment.char = "", ...)
drReadcsv2(file, header = TRUE, sep = ";",
quote = "\"", dec = ",", fill = TRUE, comment.char = "", ...)
drReaddelim(file, header = TRUE, sep = "\t",
quote = "\"", dec = ".", fill = TRUE, comment.char = "", ...)
drReaddelim2(file, header = TRUE, sep = "\t",
quote = "\"", dec = ",", fill = TRUE, comment.char = "", ...)
hdfsConn
object pointing to a text file on HDFS (see output
argument below)
read.table
for each chunk being processed - see read.table
for more info. Most all have defaults or appropriate defaults are set through other format-specific functions such as drRead.csv
and drRead.delim
.
read.table
for more info
read.table
for more info
read.table
for more info
read.table
for more info
read.table
for more info
read.table
for more info
read.table
for more info
read.table
for more info
read.table
for more info
read.table
, but keeping the default of TRUE
is advantageous for speed.
localDiskConn
object if input is a text file on local disk, or a hdfsConn
object if input is a text file on HDFS.
overwrite = “backup”
to move the existing output to _bak)
postTransFn
fn
(most should be taken care of automatically such that this is rarely necessary to specify)
rhwatch
in RHIPE) - see rhipeControl
and localDiskControl
read.table
for more info
For local disk, the file is actually read in sequentially instead of in parallel. This is because of possible performance issues when trying to read from the same disk in parallel.
Note that if skip
is positive and/or if header
is TRUE
, it will first read these in as they only occur once in the data, and we then check for these lines in each block and remove those lines if they appear.
Also note that if you supply “Factor”
column classes, they will be converted to character.
csvFile <- file.path(tempdir(), "iris.csv")
write.csv(iris, file = csvFile, row.names = FALSE, quote = FALSE)
irisTextConn <- localDiskConn(file.path(tempdir(), "irisText2"), autoYes = TRUE)
a <- drRead.csv(csvFile, output = irisTextConn, rowsPerBlock = 10)
Ryan Hafen
Experimental helper function for reading text data on HDFS into a HDFS connection
readHDFStextFile(input, output = NULL, overwrite = FALSE, fn = NULL,
keyFn = NULL, linesPerBlock = 10000, control = NULL, update = FALSE)
rhfmt
localDiskConn
, and hdfsConn
overwrite = “backup”
to move the existing output to _bak)
rhwatch
in RHIPE) - see rhipeControl
and localDiskControl
res <- readHDFStextFile(
input = Rhipe::rhfmt("/path/to/input/text", type = "text"),
output = hdfsConn("/path/to/output"),
fn = function(x) {
read.csv(textConnection(paste(x, collapse = "\n")), header = FALSE)
}
)
Experimental helper function for reading text data sequentially from a file on disk and adding to connection using addData
readTextFileByChunk(input, output, overwrite = FALSE, linesPerBlock = 10000,
fn = NULL, header = TRUE, skip = 0, recordEndRegex = NULL,
cl = NULL)
localDiskConn
, and hdfsConn
overwrite = “backup”
to move the existing output to _bak)
makeCluster
The function fn
should have one argument, which should expect to receive a vector of strings, each element of which is a line in the file. It is also possible for fn
to take two arguments, in which case the second argument is the header line from the file (some parsing methods might need to know the header).
csvFile <- file.path(tempdir(), "iris.csv")
write.csv(iris, file = csvFile, row.names = FALSE, quote = FALSE)
myoutput <- localDiskConn(file.path(tempdir(), "irisText"), autoYes = TRUE)
a <- readTextFileByChunk(csvFile,
output = myoutput, linesPerBlock = 10,
fn = function(x, header) {
colNames <- strsplit(header, ",")[[1]]
read.csv(textConnection(paste(x, collapse = "\n")), col.names = colNames, header = FALSE)
})
a[[1]]
Convert ‘ddo’ / ‘ddf’ objects between different storage backends
convert(from, to, overwrite = FALSE)
localDiskConn
or hdfsConn
) or NULL
if an in-memory ddo / ddf is desired
to
be overwritten?
d <- divide(iris, by = "Species")
# convert in-memory ddf to one stored on disk
dl <- convert(d, localDiskConn(tempfile(), autoYes = TRUE))
dl
Rbind all the rows of a ‘ddf’ object into a single data frame
as.data.frameddf(x, row.names = NULL, optional = FALSE,
keys = TRUE, splitVars = TRUE, bsvs = FALSE, ...)
as.data.frame
as.data.frame
d <- divide(iris, by = "Species")
as.data.frame(d)
as.listddo(x, ...)
as.list
d <- divide(iris, by = "Species")
as.list(d)
to_ddf(x)
library(dplyr)
bySpecies <- iris %>%
group_by(Species) %>%
to_ddf()
Aggregates data by cross-classifying factors, with a formula interface similar to xtabs
drAggregate(data, formula, by = NULL, output = NULL, preTransFn = NULL,
maxUnique = NULL, params = NULL, packages = NULL, control = NULL)
formula
formula
object with the cross-classifying variables (separated by +) on the right hand side (or an object which can be coerced to a formula). Interactions are not allowed. On the left hand side, one may optionally give a variable name in the data representing counts; in the latter case, the columns are interpreted as corresponding to the levels of a variable. This is useful if the data have already been tabulated.
by
being specified (see localDiskConn
, hdfsConn
). If NULL
(default), output will be an in-memory “ddo” object.
addTransform
prior to calling divide.
NULL
, it is ignored. If a positive number, only the top and bottom maxUnique
tabulations by frequency are kept.
fn
(most should be taken care of automatically such that this is rarely necessary to specify)
rhwatch
in RHIPE) - see rhipeControl
and localDiskControl
The interface is similar to xtabs
, but instead of returning a full contingency table, data is returned in the form of a data frame only with rows for which there were positive counts. This result is more similar to what is returned by aggregate
.
drAggregate(Sepal.Length ~ Species, data = ddf(iris))
Ryan Hafen
Create “hexbin” object of hexagonally binned data for a distributed data frame. This computation is division agnostic - it does not matter how the data frame is split up.
drHexbin(data, xVar, yVar, by = NULL, xTransFn = identity,
yTransFn = identity, xRange = NULL, yRange = NULL, xbins = 30,
shape = 1, params = NULL, packages = NULL, control = NULL)
fn
(most should be taken care of automatically such that this is rarely necessary to specify)
rhwatch
in RHIPE) - see rhipeControl
and localDiskControl
Carr, D. B. et al. (1987) Scatterplot Matrix Techniques for Large N. JASA 83, 398, 424–436.
# create dummy data and divide it
dat <- data.frame(
xx = rnorm(1000),
yy = rnorm(1000),
by = sample(letters, 1000, replace = TRUE))
d <- divide(dat, by = "by", update = TRUE)
# compute hexbins on divided object
dhex <- drHexbin(d, xVar = "xx", yVar = "yy")
# dhex is equivalent to running on undivided data:
hexbin(dat$xx, dat$yy)
Ryan Hafen
Compute sample quantiles for ‘ddf’ objects
drQuantile(x, var, by = NULL, probs = seq(0, 1, 0.005), preTransFn = NULL,
varTransFn = identity, varRange = NULL, nBins = 10000, tails = 100,
params = NULL, packages = NULL, control = NULL, ...)
var
(use varTransFn
for that) - also note: this is deprecated - instead use addTransform
prior to calling divide
fn
(most should be taken care of automatically such that this is rarely necessary to specify)
rhwatch
in RHIPE) - see rhipeControl
and localDiskControl
This division-agnostic quantile calculation algorithm takes the range of the variable of interest and splits it into nBins
bins, tabulates counts for those bins, and reconstructs a quantile approximation from them. nBins
should not get too large, but larger nBins
gives more accuracy. If tails
is positive, the first and last tails
ordered values are attached to the quantile estimate - this is useful for long-tailed distributions or distributions with outliers for which you would like more detail in the tails.
q
and their associated f-value fval
. If by
is specified, then also a variable group
.
# break the iris data into k/v pairs
irisSplit <- list(
list("1", iris[1:10,]), list("2", iris[11:110,]), list("3", iris[111:150,])
)
# represent it as ddf
irisSplit <- ddf(irisSplit, update = TRUE)
# approximate quantiles over the divided data set
probs <- seq(0, 1, 0.005)
iq <- drQuantile(irisSplit, var = "Sepal.Length", tails = 0, probs = probs)
plot(iq$fval, iq$q)
# compare to the all-data quantile "type 1" result
plot(probs, quantile(iris$Sepal.Length, probs = probs, type = 1))
Ryan Hafen
Divide a ddo/ddf object into subsets based on different criteria
divide(data, by = NULL, spill = 1000000, filterFn = NULL, bsvFn = NULL,
output = NULL, overwrite = FALSE, preTransFn = NULL,
postTransFn = NULL, params = NULL, packages = NULL, control = NULL,
update = FALSE, verbose = TRUE)
preTransFn
to coerce each subset into a data frame
TRUE
) part of the resulting division
localDiskConn
, hdfsConn
). If NULL
(default), output will be an in-memory “ddo” object.
overwrite = “backup”
to move the existing output to _bak)
addTransform
prior to calling divide
fn
(most should be taken care of automatically such that this is rarely necessary to specify)
rhwatch
in RHIPE) - see rhipeControl
and localDiskControl
The division methods this function will support include conditioning variable division for factors (implemented – see condDiv
), conditioning variable division for numerical variables through shingles, random replicate (implemented – see rrDiv
), and near-exact replicate. If by
is a vector of variable names, the data will be divided by these variables. Alternatively, this can be specified by e.g. condDiv(c(“var1”, “var2”))
.
# divide iris data by Species by passing in a data frame
bySpecies <- divide(iris, by = "Species")
bySpecies
# divide iris data into random partitioning of ~30 rows per subset
irisRR <- divide(iris, by = rrDiv(30))
irisRR
# any ddf can be passed into divide:
irisRR2 <- divide(bySpecies, by = rrDiv(30))
irisRR2
bySpecies2 <- divide(irisRR2, by = "Species")
bySpecies2
# splitting on multiple columns
byEdSex <- divide(adult, by = c("education", "sex"))
byEdSex
byEdSex[[1]]
# splitting on a numeric variable
bySL <- ddf(iris) %>%
addTransform(function(x) {
x$slCut <- cut(x$Sepal.Length, 10)
x
}) %>%
divide(by = "slCut")
bySL
bySL[[1]]
recombine
, ddo
, ddf
, condDiv
, rrDiv
Ryan Hafen
Specify conditioning variable division parameters for data division
condDiv(vars)
Currently each unique combination of values of vars
constitutes a subset. In the future, specifying shingles for numeric conditioning variables will be implemented.
divide
d <- divide(iris, by = "Species")
# equivalent:
d <- divide(iris, by = condDiv("Species"))
divide
, getSplitVars
, getSplitVar
Ryan Hafen
Specify random replicate division parameters for data division
rrDiv(nrows = NULL, seed = NULL)
The random replicate division method currently gets the total number of rows of the input data and divides it by nrows
to get the number of subsets. Then it randomly assigns each row of the input data to one of the subsets, resulting in subsets with approximately nrows
rows. A future implementation will make each subset have exactly nrows
rows.
divide
# divide iris data into random subsets with ~20 records per subset
irisRR <- divide(iris, by = rrDiv(20), update = TRUE)
irisRR
# look at the actual distribution of number of rows per subset
plot(splitRowDistn(irisRR))
Ryan Hafen
Add a transformation function to be applied to each subset of a distributed data object
addTransform(obj, fn, name = NULL, params = NULL, packages = NULL)
obj
- see details
obj
that are needed in the transformation function (most should be taken care of automatically such that this is rarely necessary to specify)
fn
(most should be taken care of automatically such that this is rarely necessary to specify)
When you add a transformation to a distributed data object, the transformation is not applied immediately, but is deferred until a function that kicks off a computation is done. These include divide
, recombine
, drJoin
, drLapply
, drFilter
, drSample
, drSubset
. When any of these are invoked on an object with a transformation attached to it, the transformation will be applied in the map phase of the MapReduce computation prior to any other computation. The transformation will also be applied any time a subset of the data is requested. Although the data has not been physically transformed after a call of addTransform
, we can think of it conceptually as already being transformed.
To force the transformation to be immediately calculated on all subsets use: drPersist(dat, output = …)
.
The function provided by fn
can either accept one or two parameters. If it accepts one parameter, the value of a key-value pair is passed in. It if accepts two parameters, it is passed the key as the first parameter and the value as the second parameter. The return value of fn
is treated as a value of a key-value pair unless the return type comes from kvPair
.
When addTransform
is called, it is tested on a subset of the data to make sure we have all of the necessary global variables and packages loaded necessary to portably perform the transformation.
It is possible to add multiple transformations to a distributed data object, in which case they are applied in the order supplied, but only one transform should be necessary.
The transformation function must not return NULL on any data subset, although it can return an empty object of the correct shape to match othersubsets (e.g. a data.frame with the correct columns but zero rows).
obj
, with the tranformation included as one of the attributes of the returned object.
# Create a distributed data frame using the iris data set, backed by the
# kvMemory (in memory) connection
bySpecies <- divide(iris, by = "Species")
bySpecies
# Note a tranformation is not present in the attributes
names(attributes(bySpecies))
## A transform that operates only on values of the key-value pairs
##----------------------------------------------------------------
# Create a function that will calculate the mean of each variable in
# in a subset. The calls to 'as.data.frame()' and 't()' convert the
# vector output of 'apply()' into a data.frame with a single row
colMean <- function(x) as.data.frame(t(apply(x, 2, mean)))
# Test on a subset
colMean(bySpecies[[1]][[2]])
# Add a tranformation that will calculate the mean of each variable
bySpeciesTransformed <- addTransform(bySpecies, colMean)
# Note how 'before transformation' appears to describe the values of
# several of the attributes
bySpeciesTransformed
# Note the addition of the transformation to the attributes
names(attributes(bySpeciesTransformed))
# We can see the result of the transformation by looking at one of
# the subsets:
bySpeciesTransformed[[1]]
# The transformation is automatically applied when calling any data
# operation. For example, if can call 'recombine()' with 'combRbind'
# we will get a data frame of the column means for each subset:
varMeans <- recombine(bySpeciesTransformed, combine = combRbind)
varMeans
## A transform that operates on both keys and values
##---------------------------------------------------------
# We can also create a transformation that uses both the keys and values
# It will select the first row of the value, and append '-firstRow' to
# the key
aTransform <- function(key, val) {
newKey <- paste(key, "firstRow", sep = "-")
newVal <- val[1,]
kvPair(newKey, newVal)
}
# Apply the transformation
recombine(addTransform(bySpecies, aTransform))
This is called internally in the map phase of datadr MapReduce jobs. It is not meant for use outside of there, but is exported for convenience.
applyTransform(transFns, x, env = NULL)
setupTransformEnv
) - if NULL
, the environment will be set up for you
# Create a distributed data frame using the iris data set, backed by the
# kvMemory (in memory) connection
bySpecies <- divide(iris, by = "Species")
bySpecies
# Note a tranformation is not present in the attributes
names(attributes(bySpecies))
## A transform that operates only on values of the key-value pairs
##----------------------------------------------------------------
# Create a function that will calculate the mean of each variable in
# in a subset. The calls to 'as.data.frame()' and 't()' convert the
# vector output of 'apply()' into a data.frame with a single row
colMean <- function(x) as.data.frame(t(apply(x, 2, mean)))
# Test on a subset
colMean(bySpecies[[1]][[2]])
# Add a tranformation that will calculate the mean of each variable
bySpeciesTransformed <- addTransform(bySpecies, colMean)
# Note how 'before transformation' appears to describe the values of
# several of the attributes
bySpeciesTransformed
# Note the addition of the transformation to the attributes
names(attributes(bySpeciesTransformed))
# We can see the result of the transformation by looking at one of
# the subsets:
bySpeciesTransformed[[1]]
# The transformation is automatically applied when calling any data
# operation. For example, if can call 'recombine()' with 'combRbind'
# we will get a data frame of the column means for each subset:
varMeans <- recombine(bySpeciesTransformed, combine = combRbind)
varMeans
## A transform that operates on both keys and values
##---------------------------------------------------------
# We can also create a transformation that uses both the keys and values
# It will select the first row of the value, and append '-firstRow' to
# the key
aTransform <- function(key, val) {
newKey <- paste(key, "firstRow", sep = "-")
newVal <- val[1,]
kvPair(newKey, newVal)
}
# Apply the transformation
recombine(addTransform(bySpecies, aTransform))
This is called internally in the map phase of datadr MapReduce jobs. It is not meant for use outside of there, but is exported for convenience. Given an environment and collection of transformations, it populates the environment with the global variables in the transformations.
setupTransformEnv(transFns, env = NULL)
# Create a distributed data frame using the iris data set, backed by the
# kvMemory (in memory) connection
bySpecies <- divide(iris, by = "Species")
bySpecies
# Note a tranformation is not present in the attributes
names(attributes(bySpecies))
## A transform that operates only on values of the key-value pairs
##----------------------------------------------------------------
# Create a function that will calculate the mean of each variable in
# in a subset. The calls to 'as.data.frame()' and 't()' convert the
# vector output of 'apply()' into a data.frame with a single row
colMean <- function(x) as.data.frame(t(apply(x, 2, mean)))
# Test on a subset
colMean(bySpecies[[1]][[2]])
# Add a tranformation that will calculate the mean of each variable
bySpeciesTransformed <- addTransform(bySpecies, colMean)
# Note how 'before transformation' appears to describe the values of
# several of the attributes
bySpeciesTransformed
# Note the addition of the transformation to the attributes
names(attributes(bySpeciesTransformed))
# We can see the result of the transformation by looking at one of
# the subsets:
bySpeciesTransformed[[1]]
# The transformation is automatically applied when calling any data
# operation. For example, if can call 'recombine()' with 'combRbind'
# we will get a data frame of the column means for each subset:
varMeans <- recombine(bySpeciesTransformed, combine = combRbind)
varMeans
## A transform that operates on both keys and values
##---------------------------------------------------------
# We can also create a transformation that uses both the keys and values
# It will select the first row of the value, and append '-firstRow' to
# the key
aTransform <- function(key, val) {
newKey <- paste(key, "firstRow", sep = "-")
newVal <- val[1,]
kvPair(newKey, newVal)
}
# Apply the transformation
recombine(addTransform(bySpecies, aTransform))
Persist a transformed ‘ddo’ or ‘ddf’ object by making a deferred transformation permanent
drPersist(x, output = NULL, overwrite = FALSE, control = NULL)
localDiskConn
, hdfsConn
). If NULL
(default), output will be an in-memory “ddo” object.
overwrite = “backup”
to move the existing output to _bak)
rhwatch
in RHIPE) - see rhipeControl
and localDiskControl
When a transformation is added to a ddf/ddo via addTransform
, the transformation is deferred until the some action is taken with the data (e.g. a call to recombine
). See the documentation of addTransform
for more information about the nature of transformations.
Calling drPersist()
on the ddo/ddf makes the transformation permanent (persisted). In the case of a local disk connection (via localDiskConn
) or HDFS connection (via hdfsConn
), the transformed data are written to disk.
bySpecies <- divide(iris, by = "Species")
# Create the transformation and add it to bySpecies
bySpeciesSepal <- addTransform(bySpecies, function(x) x[,c("Sepal.Length", "Sepal.Width")])
# Note the transformation is 'pending' a data action
bySpeciesSepal
# Make the tranformation permanent (persistent)
bySpeciesSepalPersisted <- drPersist(bySpeciesSepal)
# The transformation no longer pending--but a permanent part of the new ddo
bySpeciesSepalPersisted
bySpeciesSepalPersisted[[1]]
Ryan Hafen
Apply an analytic recombination method to a ddo/ddf object and combine the results
recombine(data, combine = NULL, apply = NULL, output = NULL,
overwrite = FALSE, params = NULL, packages = NULL, control = NULL,
verbose = TRUE)
combCollect
, combDdf
, combDdo
, combRbind
, etc. If combine = NULL
, combCollect
will be used if output = NULL
and combDdo
is used if output
is specified.
drBLB
, drGLM
, for example). NOTE: This argument is now deprecated in favor of addTransform
localDiskConn
, hdfsConn
). If NULL
(default), output will be an in-memory “ddo” object
overwrite = “backup”
to move the existing output to _bak)
fn
(most should be taken care of automatically such that this is rarely necessary to specify)
rhwatch
in RHIPE) - see rhipeControl
and localDiskControl
combine
: this could be a distributed data object, a data frame, a key-value list, etc. See examples.
## in-memory example
##---------------------------------------------------------
# begin with an in-memory ddf (backed by kvMemory)
bySpecies <- divide(iris, by = "Species")
# create a function to calculate the mean for each variable
colMean <- function(x) data.frame(lapply(x, mean))
# apply the transformation
bySpeciesTransformed <- addTransform(bySpecies, colMean)
# recombination with no 'combine' argument and no argument to output
# produces the key-value list produced by 'combCollect()'
recombine(bySpeciesTransformed)
# but we can also preserve the distributed data frame, like this:
recombine(bySpeciesTransformed, combine = combDdf)
# or we can recombine using 'combRbind()' and produce a data frame:
recombine(bySpeciesTransformed, combine = combRbind)
## local disk connection example with parallelization
##---------------------------------------------------------
# create a 2-node cluster that can be used to process in parallel
cl <- parallel::makeCluster(2)
# create the control object we'll pass into local disk datadr operations
control <- localDiskControl(cluster = cl)
# note that setting options(defaultLocalDiskControl = control)
# will cause this to be used by default in all local disk operations
# create local disk connection to hold bySpecies data
ldPath <- file.path(tempdir(), "by_species")
ldConn <- localDiskConn(ldPath, autoYes = TRUE)
# convert in-memory bySpecies to local-disk ddf
bySpeciesLD <- convert(bySpecies, ldConn)
# apply the transformation
bySpeciesTransformed <- addTransform(bySpeciesLD, colMean)
# recombine the data using the transformation
bySpeciesMean <- recombine(bySpeciesTransformed,
combine = combRbind, control = control)
bySpeciesMean
# remove temporary directories
unlink(ldPath, recursive = TRUE)
# shut down the cluster
parallel::stopCluster(cl)
divide
, ddo
, ddf
, drGLM
, drBLB
, combMeanCoef
, combMean
, combCollect
, combRbind
, drLapply
Ryan Hafen
Bag of little bootstraps transformation method
drBLB(x, statistic, metric, R, n)
R
bootstrap samples of each statistic returned by statistic
. Expects an input vector and should output a vector.
It is necessary to specify weights
as a parameter to the statistic
function because for BLB to work efficiently, it must resample each time with a sample of size n
. To make this computationally possible for very large n
, we can use weights
(see reference for details). Therefore, only methods with a weights option can legitimately be used here.
Kleiner, Ariel, et al. “A scalable bootstrap for massive data.” Journal of the Royal Statistical Society: Series B (Statistical Methodology) 76.4 (2014): 795-816.
# BLB is meant to run on random replicate divisions
rrAdult <- divide(adult, by = rrDiv(1000), update = TRUE)
adultBlb <- rrAdult %>% addTransform(function(x) {
drBLB(x,
statistic = function(x, weights)
coef(glm(incomebin ~ educationnum + hoursperweek + sex,
data = x, weights = weights, family = binomial())),
metric = function(x)
quantile(x, c(0.05, 0.95)),
R = 100,
n = nrow(rrAdult)
)
})
# compute the mean of the resulting CI limits
# (this will take a little bit of time because of resampling)
coefs <- recombine(adultBlb, combMean)
matrix(coefs, ncol = 2, byrow = TRUE)
Ryan Hafen
LM transformation method – – Fit a linear model to each subset
drLM(...)
lm
function
This provides a transformation function to be called for each subset in a recombination MapReduce job that applies Rs lm method and outputs the coefficients in a way that combMeanCoef
knows how to deal with. It can be applied to a ddf with addTransform
prior to calling recombine
.
drCoef
that contains the lm coefficients and other data needed by combMeanCoef
# Divide the data
bySpecies <- divide(iris, by = "Species")
# A function to fit a multiple linear regression model to each species
linearReg <- function(x)
drLM(Sepal.Length ~ Sepal.Width + Petal.Length + Petal.Width,
data = x)
# Apply the transform and combine using 'combMeanCoef'
bySpecies %>%
addTransform(linearReg) %>%
recombine(combMeanCoef)
Landon Sego
GLM transformation method – Fit a generalized linear model to each subset
drGLM(...)
glm
function
This provides a transformation function to be called for each subset in a recombination MapReduce job that applies Rs glm method and outputs the coefficients in a way that combMeanCoef
knows how to deal with. It can be applied to a ddf with addTransform
prior to calling recombine
.
drCoef
that contains the glm coefficients and other data needed by combMeanCoef
# Artificially dichotomize the Sepal.Lengths of the iris data to
# demonstrate a GLM model
irisD <- iris
irisD$Sepal <- as.numeric(irisD$Sepal.Length > median(irisD$Sepal.Length))
# Divide the data
bySpecies <- divide(irisD, by = "Species")
# A function to fit a logistic regression model to each species
logisticReg <- function(x)
drGLM(Sepal ~ Sepal.Width + Petal.Length + Petal.Width,
data = x, family = binomial())
# Apply the transform and combine using 'combMeanCoef'
bySpecies %>%
addTransform(logisticReg) %>%
recombine(combMeanCoef)
Ryan Hafen
“Collect” recombination - collect the results into a local list of key-value pairs
combCollect(...)
combCollect
is passed to the argument combine
in recombine
# Create a distributed data frame using the iris data set
bySpecies <- divide(iris, by = "Species")
# Function to calculate the mean of the petal widths
meanPetal <- function(x) mean(x$Petal.Width)
# Combine the results using rbind
combined <- recombine(addTransform(bySpecies, meanPetal), combine = combCollect)
class(combined)
combined
# A more concise (and readable) way to do it
bySpecies %>%
addTransform(meanPetal) %>%
recombine(combCollect)
divide
, recombine
, combDdo
, combDdf
, combMeanCoef
, combRbind
, combMean
Ryan Hafen
“DDO” recombination - simply collect the results into a “ddo” object
combDdo(...)
combDdo
is passed to the argument combine
in recombine
# Divide the iris data
bySpecies <- divide(iris, by = "Species")
# Add a transform that returns a list for each subset
listTrans <- function(x) {
list(meanPetalWidth = mean(x$Petal.Width),
maxPetalLength = max(x$Petal.Length))
}
# Apply the transform and combine using combDdo
combined <- recombine(addTransform(bySpecies, listTrans), combine = combDdo)
combined
combined[[1]]
# A more concise (and readable) way to do it
bySpecies %>%
addTransform(listTrans) %>%
recombine(combDdo)
divide
, recombine
, combCollect
, combMeanCoef
, combRbind
, combMean
Ryan Hafen
“DDF” recombination - results into a “ddf” object, rbinding if necessary
combDdf(...)
combDdf
is passed to the argument combine
in recombine
.
If the value
of the “ddo” object that will be recombined is a list, then the elements in the list will be collapsed together via rbind
.
# Divide the iris data
bySpecies <- divide(iris, by = "Species")
## Simple combination to form a ddf
##---------------------------------------------------------
# Add a transform that selects the petal width and length variables
selVars <- function(x) x[,c("Petal.Width", "Petal.Length")]
# Apply the transform and combine using combDdo
combined <- recombine(addTransform(bySpecies, selVars), combine = combDdf)
combined
combined[[1]]
# A more concise (and readable) way to do it
bySpecies %>%
addTransform(selVars) %>%
recombine(combDdf)
## Combination that involves rbinding to give the ddf
##---------------------------------------------------------
# A transformation that returns a list
listTrans <- function(x) {
list(meanPetalWidth = mean(x$Petal.Width),
maxPetalLength = max(x$Petal.Length))
}
# Apply the transformation and look at the result
bySpeciesTran <- addTransform(bySpecies, listTrans)
bySpeciesTran[[1]]
# And if we rbind the "value" of the first subset:
out1 <- rbind(bySpeciesTran[[1]]$value)
out1
# Note how the combDdf method row binds the two data frames
combined <- recombine(bySpeciesTran, combine = combDdf)
out2 <- combined[[1]]
out2
# These are equivalent
identical(out1, out2$value)
divide
, recombine
, combCollect
, combMeanCoef
, combRbind
, combDdo
, combDdf
Ryan Hafen
“rbind” recombination - Combine ddf divisions by row binding
combRbind(...)
combRbind
is passed to the argument combine
in recombine
# Create a distributed data frame using the iris data set
bySpecies <- divide(iris, by = "Species")
# Create a function that will calculate the standard deviation of each
# variable in in a subset. The calls to 'as.data.frame()' and 't()'
# convert the vector output of 'apply()' into a data.frame with a single row
sdCol <- function(x) as.data.frame(t(apply(x, 2, sd)))
# Combine the results using rbind
combined <- recombine(addTransform(bySpecies, sdCol), combine = combRbind)
class(combined)
combined
# A more concise (and readable) way to do it
bySpecies %>%
addTransform(sdCol) %>%
recombine(combRbind)
divide
, recombine
, combDdo
, combDdf
, combCollect
, combMeanCoef
, combMean
Ryan Hafen
Mean recombination – Calculate the elementwise mean of a vector in each value
combMean(...)
combMean
is passed to the argument combine
in recombine
This method assumes that the values of the key-value pairs each consist of a numeric vector (with the same length). The mean is calculated elementwise across all the keys.
# Create a distributed data frame using the iris data set
bySpecies <- divide(iris, by = "Species")
# Add a transformation that returns a vector of sums for each subset, one
# mean for each variable
bySpeciesTrans <- addTransform(bySpecies, function(x) apply(x, 2, sum))
bySpeciesTrans[[1]]
# Calculate the elementwise mean of the vector of sums produced by
# the transform, across the keys
out1 <- recombine(bySpeciesTrans, combine = combMean)
out1
# A more concise (and readable) way to do it
bySpecies %>%
addTransform(function(x) apply(x, 2, sum)) %>%
recombine(combMean)
# This manual, non-datadr approach illustrates the above computation
# This step mimics the transformation above
sums <- aggregate(. ~ Species, data = iris, sum)
sums
# And this step mimics the mean recombination
out2 <- apply(sums[,-1], 2, mean)
out2
# These are the same
identical(out1, out2)
divide
, recombine
, combCollect
, combDdo
, combDdf
, combRbind
, combMeanCoef
Ryan Hafen
Mean coefficient recombination – Calculate the weighted average of parameter estimates for a model fit to each subset
combMeanCoef(...)
combMeanCoef
is passed to the argument combine
in recombine
This method is designed to calculate the mean of each model coefficient, where the same model has been fit to subsets via a transformation. The mean is a weighted average of each coefficient, where the weights are the number of observations in each subset. In particular, drLM
and drGLM
functions should be used to add the transformation to the ddo that will be recombined using combMeanCoef
.
# Create an irregular number of observations for each species
indexes <- sort(c(sample(1:50, 40), sample(51:100, 37), sample(101:150, 46)))
irisIrr <- iris[indexes,]
# Create a distributed data frame using the irregular iris data set
bySpecies <- divide(irisIrr, by = "Species")
# Fit a linear model of Sepal.Length vs. Sepal.Width for each species
# using 'drLM()' (or we could have used 'drGLM()' for a generlized linear model)
lmTrans <- function(x) drLM(Sepal.Length ~ Sepal.Width, data = x)
bySpeciesFit <- addTransform(bySpecies, lmTrans)
# Average the coefficients from the linear model fits of each species, weighted
# by the number of observations in each species
out1 <- recombine(bySpeciesFit, combine = combMeanCoef)
out1
# A more concise (and readable) way to do it
bySpecies %>%
addTransform(lmTrans) %>%
recombine(combMeanCoef)
# The following illustrates an equivalent, but more tedious approach
lmTrans2 <- function(x) t(c(coef(lm(Sepal.Length ~ Sepal.Width, data = x)), n = nrow(x)))
res <- recombine(addTransform(bySpecies, lmTrans2), combine = combRbind)
colnames(res) <- c("Species", "Intercept", "Sepal.Width", "n")
res
out2 <- c("(Intercept)" = with(res, sum(Intercept * n) / sum(n)),
"Sepal.Width" = with(res, sum(Sepal.Width * n) / sum(n)))
# These are the same
identical(out1, out2)
divide
, recombine
, rrDiv
, combCollect
, combDdo
, combDdf
, combRbind
, combMean
Ryan Hafen
Filter a ‘ddo’ or ‘ddf’ object by selecting key-value pairs that satisfy a logical condition
drFilter(x, filterFn, output = NULL, overwrite = FALSE, params = NULL,
packages = NULL, control = NULL)
TRUE
or FALSE
- if TRUE
, that key-value pair will be present in the result. See examples for details.
localDiskConn
, hdfsConn
). If NULL
(default), output will be an in-memory “ddo” object.
overwrite = “backup”
to move the existing output to _bak)
filterFn
(most should be taken care of automatically such that this is rarely necessary to specify)
rhwatch
in RHIPE) - see rhipeControl
and localDiskControl
# Create a ddf using the iris data
bySpecies <- divide(iris, by = "Species")
# Filter using only the 'value' of the key/value pair
drFilter(bySpecies, function(v) mean(v$Sepal.Width) < 3)
# Filter using both the key and value
drFilter(bySpecies, function(k,v) k != "Species=virginica" & mean(v$Sepal.Width) < 3)
Ryan Hafen
Outer join of two or more distributed data object (DDO) sources by key
drJoin(..., output = NULL, overwrite = FALSE, postTransFn = NULL,
params = NULL, packages = NULL, control = NULL)
localDiskConn
, hdfsConn
). If NULL
(default), output will be an in-memory “ddo” object.
overwrite = “backup”
to move the existing output to _bak)
fn
(most should be taken care of automatically such that this is rarely necessary to specify)
rhwatch
in RHIPE) - see rhipeControl
and localDiskControl
output
connection, where the values are named lists with names according to the names given to the input data objects, and values are the corresponding data. The ddo object contains the union of all the keys contained in the input ddo objects specified in …
.
bySpecies <- divide(iris, by = "Species")
# get independent lists of just SW and SL
sw <- drLapply(bySpecies, function(x) x$Sepal.Width)
sl <- drLapply(bySpecies, function(x) x$Sepal.Length)
drJoin(Sepal.Width = sw, Sepal.Length = sl, postTransFn = as.data.frame)
Ryan Hafen
Apply a function to all key-value pairs of a ddo/ddf object and get a new ddo object back, unless a different combine
strategy is specified.
drLapply(X, FUN, combine = combDdo(), output = NULL, overwrite = FALSE,
params = NULL, packages = NULL, control = NULL, verbose = TRUE)
localDiskConn
, hdfsConn
). If NULL
(default), output will be an in-memory “ddo” object.
overwrite = “backup”
to move the existing output to _bak)
fn
(most should be taken care of automatically such that this is rarely necessary to specify)
rhwatch
in RHIPE) - see rhipeControl
and localDiskControl
combine
bySpecies <- divide(iris, by = "Species")
drLapply(bySpecies, function(x) x$Sepal.Width)
recombine
, drFilter
, drJoin
, combDdo
, combRbind
Ryan Hafen
drSample(x, fraction, output = NULL, overwrite = FALSE, control = NULL)
localDiskConn
, hdfsConn
). If NULL
(default), output will be an in-memory “ddo” object.
overwrite = “backup”
to move the existing output to _bak)
rhwatch
in RHIPE) - see rhipeControl
and localDiskControl
bySpecies <- divide(iris, by = "Species")
set.seed(234)
sampleRes <- drSample(bySpecies, fraction = 0.25)
Return a subset of a “ddf” object to memory
drSubset(data, subset = NULL, select = NULL, drop = FALSE,
preTransFn = NULL, maxRows = 500000, params = NULL, packages = NULL,
control = NULL, verbose = TRUE)
preTransFn
to coerce each subset into a data frame
addTransform
prior to calling divide
fn
(most should be taken care of automatically such that this is rarely necessary to specify)
rhwatch
in RHIPE) - see rhipeControl
and localDiskControl
d <- divide(iris, by = "Species")
drSubset(d, Sepal.Length < 5)
Ryan Hafen
Execute a MapReduce job
mrExec(data, setup = NULL, map = NULL, reduce = NULL, output = NULL,
overwrite = FALSE, control = NULL, params = NULL, packages = NULL,
verbose = TRUE)
expression
) to be run before map and reduce
reduce = expression(pre = {…}, reduce = {…}, post = {…})
. reduce is optional, and if not specified the map output key-value pairs will be the result. If it is not specified, then a default identity reduce is performed. Setting it to 0 will skip the reduce altogether.
localDiskConn
, hdfsConn
). If NULL
(default), output will be an in-memory “ddo” object. If a character string, it will be treated as a path to be passed to the same type of connection as data
- relative paths will be relative to the working directory of that back end.
overwrite = “backup”
to move the existing output to _bak)
rhwatch
in RHIPE) - see rhipeControl
and localDiskControl
fn
(most should be taken care of automatically such that this is rarely necessary to specify)
# compute min and max Sepal Length by species for iris data
# using a random partitioning of it as input
d <- divide(iris, by = rrDiv(20))
mapExp <- expression({
lapply(map.values, function(r) {
by(r, r$Species, function(x) {
collect(
as.character(x$Species[1]),
range(x$Sepal.Length, na.rm = TRUE)
)
})
})
})
reduceExp <- expression(
pre = {
rng <- c(Inf, -Inf)
}, reduce = {
rx <- unlist(reduce.values)
rng <- c(min(rng[1], rx, na.rm = TRUE), max(rng[2], rx, na.rm = TRUE))
}, post = {
collect(reduce.key, rng)
})
res <- mrExec(d, map = mapExp, reduce = reduceExp)
as.list(res)
Ryan Hafen
Add split variables and BSVs (if any) as columns to a subset of a ddf.
flatten(x)
d <- divide(iris, by = "Species")
# the column "Species" is no longer explicitly in the data
d[[1]]$value
# but it is preserved and can be added back in with flatten()
flatten(d[[1]]$value)
Construct between subset variable (BSV) For a given key-value pair, get a BSV variable value by name (if present)
bsv(val = NULL, desc = "")
getBsv(x, name)
getBsvs(x)
Should be called inside the bsvFn
argument to divide
used for constructing a BSV list for each subset of a division.
irisDdf <- ddf(iris)
bsvFn <- function(dat) {
list(
meanSL = bsv(mean(dat$Sepal.Length), desc = "mean sepal length"),
meanPL = bsv(mean(dat$Petal.Length), desc = "mean petal length")
)
}
# divide the data by species
bySpecies <- divide(irisDdf, by = "Species", bsvFn = bsvFn)
# see BSV info attached to the result
bsvInfo(bySpecies)
# get BSVs for a specified subset of the division
getBsvs(bySpecies[[1]])
Ryan Hafen
Get global variables and package dependencies for a function
drGetGlobals(f)
This traverses the parent environments of the supplied function and finds all global variables using findGlobals
and retrieves their values. All package function calls are also found and a list of required packages is also returned.
a <- 1
f <- function(x) x + a
drGetGlobals(f)
Ryan Hafen
dfSplit(curDF, by, seed)
addSplitAttrs(curSplit, bsvFn, by, postTransFn = NULL)
These functions can be ignored. They are only exported to make their use in a distributed setting more convenient.
makeExtractable(obj)
conn <- hdfsConn("/test/irisSplit")
# add some data
addData(conn, list(list("1", iris[1:10,])))
addData(conn, list(list("2", iris[11:110,])))
addData(conn, list(list("3", iris[111:150,])))
# represent it as a distributed data frame
hdd <- ddf(conn)
# try to extract values by key (this will result in an error)
# (HDFS can only lookup key-value pairs by key if data is in a mapfile)
hdd[["3"]]
# convert hdd into a mapfile
hdd <- makeExtractable(hdd)
# try again
hdd[["3"]]
Functions that are used to tabulate categorical variables and compute moments for numeric variables inside through the MapReduce framework. Used in updateAttributes
.
tabulateMap(formula, data)
tabulateReduce(result, reduce.values, maxUnique = NULL)
calculateMoments(y, order = 1, na.rm = TRUE)
combineMoments(m1, m2)
combineMultipleMoments(...)
moments2statistics(m)
xtabs
tabulateReduce
parameters
NULL
, it is ignored. If a positive number, only the top and bottom maxUnique
tabulations by frequency are kept.
calculateMoments
parameters
combineMoments
parameters
moments2statistics
parameters
d <- divide(iris, by = "Species", update = TRUE)
summary(d)
This is used internally for conditioning variable division. It does not have much use outside of there, but is exported for convenience.
getCondCuts(df, splitVars)
# see how key names are obtained
getCondCuts(iris, "Species")
For a given key-value pair or value, get a split variable value by name, if present (split variables are variables that define how the data was divided).
For a given key-value pair or value, get a split variable value by name, if present (split variables are variables that define how the data was divided).
Construct between subset variable (BSV) For a given key-value pair, get a BSV variable value by name (if present)
Should be called inside the bsvFn
argument to divide
used for constructing a BSV list for each subset of a division.
Ryan Hafen
Construct between subset variable (BSV) For a given key-value pair, get a BSV variable value by name (if present)
Should be called inside the bsvFn
argument to divide
used for constructing a BSV list for each subset of a division.
Ryan Hafen
“Census Income” dataset from UCI machine learning repository
adult
(From UCI machine learning repository)
(From UCI machine learning repository) Link: http://archive.ics.uci.edu/ml/datasets/Adult Donor: Ronny Kohavi and Barry Becker Data Mining and Visualization Silicon Graphics. e-mail: ronnyk@live.com for questions.
Data Set Information: Extraction was done by Barry Becker from the 1994 Census database. A set of reasonably clean records was extracted using the following conditions: ((AAGE>16) && (AGI>100) && (AFNLWGT>1)&& (HRSWK>0))
Bache, K. & Lichman, M. (2013). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science.