Skale API¶
The Skale module is the main entry point for Skale functionality.
To use it, one must require('skale')
.
skale.context([config])¶
Creates and returns a new context which represents the connection to the Skale cluster, and which can be used to create datasets on that cluster. Config is an Object which defines the cluster server, with the following defaults:
{ host: 'localhost', // Cluster server host, settable also by SKALE_HOST env port: '12346' // Cluster server port, settable also by SKALE_PORT env }
Example:
var skale = require('skale'); var sc = skale.context();
sc.env¶
The sc.env
property returns an object containing user environment variables
to be set in workers.
To set and propagate an environment variable to all workers, assign sc.env
object
prior to invoking an action.
Example:
sc.env.MY_VAR = 'my_value';
sc.end()¶
Closes the connection to the cluster.
sc.lineStream(input_stream)¶
Returns a new dataset of lines of text read from input_stream Object, which is a readable stream where dataset content is read from.
The following example computes the size of a file using streams:
var stream = fs.createReadStream('data.txt', 'utf8'); sc.lineStream(stream) .map(s => s.length) .reduce((a, b) => a + b, 0) .then(console.log);
sc.objectStream(input_stream)¶
Returns a new dataset of Javascript Objects read from input_stream Object, which is a readable stream where dataset content is read from.
The following example counts the number of objects returned in an object stream using the mongodb native Javascript driver:
var cursor = db.collection('clients').find(); sc.objectStream(cursor).count().then(console.log);
sc.parallelize(array)¶
Returns a new dataset containing elements from the Array array.
Example:
var a = sc.parallelize(['Hello', 'World']);
sc.range(start[, end[, step]])¶
Returns a new dataset of integers from start to end (exclusive) increased by step (default 1) every element. If called with a single argument, the argument is interpreted as end, and start is set to 0.
sc.range(5).collect().then(console.log) // [ 0, 1, 2, 3, 4 ] sc.range(2, 4).collect().then(console.log) // [ 2, 3 ] sc.range(10, -5, -3).collect().then(console.log) // [ 10, 7, 4, 1, -2 ]
sc.require(modules)¶
Sets a list of dependency modules to be deployed in workers for use by callbacks, such as mappers or reducers. Returns the context object.
- modules: an Object of the form
{name1: 'path1', ...}
wherename1
is the name of the variable to which the module is assigned to, andpath1
a path expression as in require.resolve(path).
Under the hood, browserify is used on master side to build a bundle which is serialized and sent to workers, where It is then evaluated in global context.
Example:
// deps.js contains: // module.export = function add3(a) {return a + 3;}; sc.require({add3: './deps.js'}) .range(4) .map(a => add3) .collect() .then(console.log); // [ 3, 4, 5, 6 ]
sc.source(size, callback[, args])¶
Returns a new dataset of size elements, where each element is generated by a custom function callback executed on workers.
- size: an integer Number of elements in the dataset
- callback: a function of the form
function(index, args[, wc])
which returns the next element and with:- index: the index of the element in the dataset, comprised
between
0
andsize - 1
- args: the same parameter args passed to source
- wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies
- index: the index of the element in the dataset, comprised
between
- args: user custom parameter, passed to callback
function randArray(index, len) { var arr = []; for (var i = 0; i < len; i++) arr.push(Math.floor(Math.random() * 100)); return arr; } sc.source(3, randArray, 2).collect().then(console.log); // [ [ 31, 85 ], [ 93, 21 ], [ 99, 58 ] ]
sc.textFile(path[, options])¶
Returns a new dataset of lines in file specified by path String.
- path: a String of the general form
protocol://host/path
or/path
, where protocol can be one of: - file: if path is on local filesystem
- s3: if path relates to a repository on AWS [S3] storage system
- wasb: if path relates to a repository on Azure blob storage system
- options: an Object with the following fields:
- maxFiles: a Number of maximum files to process if the path refers to a directory.
- parquet: a Boolean to indicate that all files are in the [parquet] format. Default value is false.
if path ends by a '/' (directory separator), then the dataset
will be composed of all the files in the directory. Sub-directories
are not supported. Wildcard characters such as *
, ?
, etc, as
in the Unix Shell globbing patterns are supported.
If a file name ends by '.gz', then its content will be automatically uncompressed using GZIP.
If a file name ends by '.parquet', it will automatically be processed as a [parquet].
Note: If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.
For example, the following program prints the length of a text file:
var lines = sc.textFile('data.txt'); lines.map(s => s.length).reduce((a, b) => a + b, 0).then(console.log);
Dataset methods¶
Dataset objects, as created initially by above skale context source functions, have the following methods, allowing either to instantiate a new dataset through a transformation, or to return results to the master program.
ds.aggregate(reducer, combiner, init[, obj][, done])¶
This action computes the aggregated value of the elements
of the dataset using two functions reducer() and combiner(),
allowing to use an arbitrary accumulator type, different from element
type (as opposed to reduce()
which imposes the same type for
accumulator and element).
The result is passed to the done() callback if provided, otherwise an
ES6 promise is returned.
- reducer: a function of the form
function(acc, val[, obj[, wc]])
, which returns the next value of the accumulator (which must be of the same type as acc) and with:- acc: the value of the accumulator, initially set to init
- val: the value of the next element of the dataset on which
aggregate()
operates - obj: the same parameter obj passed to
aggregate()
- wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies.
- combiner: a function of the form
function(acc1, acc2[, obj])
, which returns the merged value of accumulators and with:- acc1: the value of an accumulator, computed locally on a worker
- acc2: the value of an other accumulator, issued by another worker
- obj: the same parameter obj passed to
aggregate()
- init: the initial value of the accumulators that are used by reducer() and combiner(). It should be the identity element of the operation (a neutral zero value, i.e. applying it through the function should not change result).
- obj: user provided data. Data will be passed to carrying serializable data from master to workers, obj is shared amongst mapper executions over each element of the dataset.
- done: a callback of the form
function(error, result)
which is called at completion. If undefined,aggregate()
returns an ES6 promise.
The following example computes the average of a dataset, avoiding a map()
:
sc.parallelize([3, 5, 2, 7, 4, 8]) .aggregate((a, v) => [a[0] + v, a[1] + 1], (a1, a2) => [a1[0] + a2[0], a1[1] + a2[1]], [0, 0]) .then(function(data) { console.log(data[0] / data[1]); }) // 4.8333
ds.aggregateByKey(reducer, combiner, init,[ obj])¶
When called on a dataset of type [k,v]
, returns a dataset of type
[k,v]
where v
is the aggregated value of all elements of same
key k
. The aggregation is performed using two functions reducer()
and combiner() allowing to use an arbitrary accumulator type,
different from element type.
- reducer: a function of the form
function(acc, val[, obj[, wc]])
, which returns the next value of the accumulator (which must be of the same type as acc) and with:- acc: the value of the accumulator, initially set to init
- val: the value
v
of the next[k,v]
element of the dataset on whichaggregateByKey()
operates - obj: the same parameter obj passed to
aggregateByKey()
- wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies.
- combiner: a function of the form
function(acc1, acc2[, obj])
, which returns the merged value of accumulators and with:- acc1: the value of an accumulator, computed locally on a worker
- acc2: the value of an other accumulator, issued by another worker
- obj: the same parameter obj passed to
aggregate()
- init: the initial value of the accumulators that are used by reducer() and combiner(). It should be the identity element of the operation (a neutral zero value, i.e. applying it through the function should not change result).
- obj: user provided data. Data will be passed to carrying serializable data from master to workers, obj is shared amongst mapper executions over each element of the dataset.
Example:
sc.parallelize([['hello', 1], ['hello', 1], ['world', 1]]) .aggregateByKey((a, b) => a + b, (a, b) => a + b, 0) .collect() .then(console.log); // [ [ 'hello', 2 ], [ 'world', 1 ] ]
ds.cartesian(other)¶
Returns a dataset wich contains all possible pairs [a, b]
where a
is in the source dataset and b
is in the other dataset.
Example:
var ds1 = sc.parallelize([1, 2]); var ds2 = sc.parallelize(['a', 'b', 'c']); ds1.cartesian(ds2).collect().then(console.log); // [ [ 1, 'a' ], [ 1, 'b' ], [ 1, 'c' ], // [ 2, 'a' ], [ 2, 'b' ], [ 2, 'c' ] ]
ds.coGroup(other)¶
When called on dataset of type [k,v]
and [k,w]
, returns a dataset of type
[k, [[v], [w]]]
, where data of both datasets share the same key.
Example:
var ds1 = sc.parallelize([[10, 1], [20, 2]]); var ds2 = sc.parallelize([[10, 'world'], [30, 3]]); ds1.coGroup(ds2).collect().then(console.log); // [ [ 10, [ [ 1 ], [ 'world' ] ] ], // [ 20, [ [ 2 ], [] ] ], // [ 30, [ [], [ 3 ] ] ] ]
ds.collect([done])¶
This action returns the content of the dataset in form of an array. The result is passed to the done() callback if provided, otherwise an ES6 promise is returned.
- done: a callback of the form
function(error, result)
which is called at completion.
Example:
sc.parallelize([1, 2, 3, 4]) .collect(function (err, res) { console.log(res); }); // [ 1, 2, 3, 4 ]
ds.count([done])¶
This action computes the number of elements in the dataset. The result is passed to the done() callback if provided, otherwise an ES6 promise is returned.
- done: a callback of the form
function(error, result)
which is called at completion.
Example:
sc.parallelize([10, 20, 30, 40]).count().then(console.log); // 4
ds.countByKey([done])¶
When called on a dataset of type [k,v]
, this action computes
the number of occurrences of elements for each key in a dataset of
type [k,v]
. It produces an array of elements of type [k,w]
where
w
is the result count. The result is passed to the done()
callback if provided, otherwise an ES6 promise is returned.
- done: a callback of the form
function(error, result)
which is called at completion.
Example:
sc.parallelize([[10, 1], [20, 2], [10, 4]]) .countByKey().then(console.log); // [ [ 10, 2 ], [ 20, 1 ] ]
ds.countByValue([done])¶
This action computes the number of occurences of each element in
dataset and returns an array of elements of type [v,n]
where v
is the element and n
its number of occurrences. The result is
passed to the done() callback if provided, otherwise an ES6
promise is returned.
- done: a callback of the form
function(error, result)
which is called at completion.
Example:
sc.parallelize([ 1, 2, 3, 1, 3, 2, 5 ]) .countByValue().then(console.log); // [ [ 1, 2 ], [ 2, 2 ], [ 3, 2 ], [ 5, 1 ] ]
ds.distinct()¶
Returns a dataset where duplicates are removed.
Example:
sc.parallelize([ 1, 2, 3, 1, 4, 3, 5 ]) .distinct() .collect().then(console.log); // [ 1, 2, 3, 4, 5 ]
ds.filter(filter[, obj])¶
- filter: a function of the form
callback(element[, obj[, wc]])
, returning a Boolean and where:- element: the next element of the dataset on which
filter()
operates - obj: the same parameter obj passed to
filter()
- wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies.
- element: the next element of the dataset on which
- obj: user provided data. Data will be passed to carrying serializable data from master to workers, obj is shared amongst mapper executions over each element of the dataset
Applies the provided filter function to each element of the source dataset and returns a new dataset containing the elements that passed the test.
Example:
function filter(data, obj) { return data % obj.modulo; } sc.parallelize([1, 2, 3, 4]) .filter(filter, {modulo: 2}) .collect().then(console.log); // [ 1, 3 ]
ds.first([done])¶
This action computes the first element in this dataset. The result is passed to the done() callback if provided, otherwise an ES6 promise is returned.
- done: a callback of the form
function(error, result)
which is called at completion.
sc.parallelize([1, 2, 3]).first().then(console.log); // 1
ds.flatMap(flatMapper[, obj])¶
Applies the provided mapper function to each element of the source dataset and returns a new dataset.
- flatMapper: a function of the form
callback(element[, obj[, wc]])
, returning an Array and where:- element: the next element of the dataset on which
flatMap()
operates - obj: the same parameter obj passed to
flatMap()
- wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies.
- element: the next element of the dataset on which
- obj: user provided data. Data will be passed to carrying serializable data from master to workers, obj is shared amongst mapper executions over each element of the dataset
Example:
sc.range(5).flatMap(a => [a, a]).collect().then(console.log); // [ 0, 0, 1, 1, 2, 2, 3, 3, 4, 4 ]
ds.flatMapValues(flatMapper[, obj])¶
Applies the provided flatMapper function to the value of each [key, value] element of the source dataset and return a new dataset containing elements defined as [key, mapper(value)], keeping the key unchanged for each source element.
- flatMapper: a function of the form
callback(element[, obj[, wc]])
, returning an Array and where:- element: the value v of the next [k,v] element of the dataset on
which
flatMapValues()
operates - obj: the same parameter obj passed to
flatMapValues()
- wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies.
- element: the value v of the next [k,v] element of the dataset on
which
- obj: user provided data. Data will be passed to carrying serializable data from master to workers, obj is shared amongst mapper executions over each element of the dataset
Example:
function valueFlatMapper(data, obj) { var tmp = []; for (var i = 0; i < obj.N; i++) tmp.push(data * obj.fact); return tmp; } sc.parallelize([['hello', 1], ['world', 2]]) .flatMapValues(valueFlatMapper, {N: 2, fact: 2}) .collect().then(console.log); // [ [ 'hello', 2 ], [ 'hello', 2 ], [ 'world', 4 ], [ 'world', 4 ] ]
ds.forEach(callback[, obj][, done])¶
This action applies a callback function on each element of the dataset. If provided, the done() callback is invoked at completion, otherwise an ES6 promise is returned.
- callback: a function of the form
function(val[, obj[, wc]])
, which returns null and with:- val: the value of the next element of the dataset on which
forEach()
operates - obj: the same parameter obj passed to
forEach()
- wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies.
- val: the value of the next element of the dataset on which
- obj: user provided data. Data will be passed to carrying serializable data from master to workers, obj is shared amongst mapper executions over each element of the dataset
- done: a callback of the form
function(error, result)
which is called at completion.
In the following example, the console.log()
callback provided
to forEach()
is executed on workers and may be not visible:
sc.parallelize([1, 2, 3, 4]) .forEach(console.log).then(console.log('finished'));
ds.groupByKey()¶
When called on a dataset of type [k,v]
, returns a dataset of type [k, [v]]
where values with the same key are grouped.
Example:
sc.parallelize([[10, 1], [20, 2], [10, 4]]) .groupByKey().collect().then(console.log); // [ [ 10, [ 1, 4 ] ], [ 20, [ 2 ] ] ]
ds.intersection(other)¶
Returns a dataset containing only elements found in source dataset and other dataset.
Example:
var ds1 = sc.parallelize([1, 2, 3, 4, 5]); var ds2 = sc.parallelize([3, 4, 5, 6, 7]); ds1.intersection(ds2).collect().then(console.log); // [ 3, 4, 5 ]
ds.join(other)¶
When called on source dataset of type [k,v]
and other dataset of type
[k,w]
, returns a dataset of type [k, [v, w]]
pairs with all pairs
of elements for each key.
Example:
var ds1 = sc.parallelize([[10, 1], [20, 2]]); var ds2 = sc.parallelize([[10, 'world'], [30, 3]]); ds1.join(ds2).collect().then(console.log); // [ [ 10, [ 1, 'world' ] ] ]
ds.keys()¶
When called on source dataset of type [k,v]
, returns a dataset with just
the elements k
.
Example:
sc.parallelize([[10, 'world'], [30, 3]]) .keys.collect().then(console.log); // [ 10, 30 ]
ds.leftOuterJoin(other)¶
When called on source dataset of type [k,v]
and other dataset of type
[k,w]
, returns a dataset of type [k, [v, w]]
pairs where the key
must be present in the other dataset.
Example:
var ds1 = sc.parallelize([[10, 1], [20, 2]]); var ds2 = sc.parallelize([[10, 'world'], [30, 3]]); ds1.leftOuterJoin(ds2).collect().then(console.log); // [ [ 10, [ 1, 'world' ] ], [ 20, [ 2, null ] ] ]
ds.lookup(k[, done])¶
When called on source dataset of type [k,v]
, returns an array
of values v
for key k
.
The result is passed to the done() callback if provided, otherwise an
ES6 promise is returned.
- done: a callback of the form
function(error, result)
which is called at completion.
Example:
sc.parallelize([[10, 'world'], [20, 2], [10, 1], [30, 3]]) .lookup(10).then(console.log); // [ world, 1 ]
ds.map(mapper[, obj])¶
Applies the provided mapper function to each element of the source dataset and returns a new dataset.
- mapper: a function of the form
callback(element[, obj[, wc]])
, returning an element and where:- element: the next element of the dataset on which
map()
operates - obj: the same parameter obj passed to
map()
- wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies.
- element: the next element of the dataset on which
- obj: user provided data. Data will be passed to carrying serializable data from master to workers, obj is shared amongst mapper executions over each element of the dataset
Example:
sc.parallelize([1, 2, 3, 4]) .map((data, obj) => data * obj.scaling, {scaling: 1.2}) .collect().then(console.log); // [ 1.2, 2.4, 3.6, 4.8 ]
ds.mapValues(mapper[, obj])¶
- mapper: a function of the form
callback(element[, obj[, wc]])
, returning an element and where:- element: the value v of the next [k,v] element of the dataset on
which
mapValues()
operates - obj: the same parameter obj passed to
mapValues()
- wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies
- element: the value v of the next [k,v] element of the dataset on
which
- obj: user provided data. Data will be passed to carrying serializable data from master to workers, obj is shared amongst mapper executions over each element of the dataset
Applies the provided mapper function to the value of each [k,v]
element of the source dataset and return a new dataset containing elements
defined as [k, mapper(v)]
, keeping the key unchanged for each
source element.
Example:
sc.parallelize([['hello', 1], ['world', 2]]) .mapValues((a, obj) => a*obj.fact, {fact: 2}) .collect().then(console.log); // [ ['hello', 2], ['world', 4] ]
ds.partitionBy(partitioner)¶
Returns a dataset partitioned using the specified partitioner. The purpose of this transformation is not to change the dataset content, but to increase processing speed by ensuring that the elements accessed by further transfomations reside in the same partition.
Example:
var skale = require('skale'); var sc = skale.context(); sc.parallelize([['hello', 1], ['world', 1], ['hello', 2], ['world', 2], ['cedric', 3]]) .partitionBy(new skale.HashPartitioner(3)) .collect.then(console.log) // [ ['world', 1], ['world', 2], ['hello', 1], ['hello', 2], ['cedric', 3] ]
ds.persist()¶
Returns the dataset, and persists the dataset content on disk (and in memory if available) in order to directly reuse content in further tasks.
Example:
var dataset = sc.range(100).map(a => a * a); // First action: compute dataset dataset.collect().then(console.log) // Second action: reuse dataset, avoid map transform dataset.collect().then(console.log)
ds.reduce(reducer, init[, obj][, done])¶
This action returns the aggregated value of the elements of the dataset using a reducer() function. The result is passed to the done() callback if provided, otherwise an ES6 promise is returned.
- reducer: a function of the form
function(acc, val[, obj[, wc]])
, which returns the next value of the accumulator (which must be of the same type as acc and val) and with:- acc: the value of the accumulator, initially set to init
- val: the value of the next element of the dataset on which
reduce()
operates - obj: the same parameter obj passed to
reduce()
- wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies.
- init: the initial value of the accumulators that are used by reducer(). It should be the identity element of the operation (i.e. applying it through the function should not change result).
- obj: user provided data. Data will be passed to carrying serializable data from master to workers, obj is shared amongst mapper executions over each element of the dataset
- done: a callback of the form
function(error, result)
which is called at completion.
Example:
sc.parallelize([1, 2, 4, 8]) .reduce((a, b) => a + b, 0) .then(console.log); // 15
ds.reduceByKey(reducer, init[, obj])¶
- reducer: a function of the form
callback(acc,val[, obj[, wc]])
, returning the next value of the accumulator (which must be of the same type as acc and val) and where:- acc: the value of the accumulator, initially set to init
- val: the value
v
of the next[k,v]
element of the dataset on whichreduceByKey()
operates - obj: the same parameter obj passed to
reduceByKey()
- wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies.
- init: the initial value of accumulator for each key. Will be passed to reducer.
- obj: user provided data. Data will be passed to carrying serializable data from master to workers, obj is shared amongst mapper executions over each element of the dataset
When called on a dataset of type [k,v]
, returns a dataset of type [k,v]
where the values of each key are aggregated using the reducer
function and the init initial value.
Example:
sc.parallelize([[10, 1], [10, 2], [10, 4]]) .reduceByKey((a,b) => a+b, 0) .collect().then(console.log); // [ [10, 7] ]
ds.rightOuterJoin(other)¶
When called on source dataset of type [k,v]
and other dataset of type
[k,w]
, returns a dataset of type [k, [v, w]]
pairs where the key
must be present in the source dataset.
Example:
var ds1 = sc.parallelize([[10, 1], [20, 2]]); var ds2 = sc.parallelize([[10, 'world'], [30, 3]]); ds1.rightOuterJoin(ds2).collect().then(console.log); // [ [ 10, [ 1, 'world' ] ], [ 30, [ null, 2 ] ] ]
ds.sample(withReplacement, frac)¶
- withReplacement: Boolean value, true if data must be sampled with replacement
- frac: Number value of the fraction of source dataset to return
Returns a dataset by sampling a fraction frac of source dataset, with or without replacement.
Example:
sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8]) .sample(true, 0.5, 0) .collect().then(console.log); // [ 1, 1, 3, 4, 4, 5, 7 ]
ds.save(url[, options][, done])¶
This action saves the content of the dataset to the destination URL. The destination is a flat directory which will contain as many files as partitions in the dataset. Files are named from partition numbers, starting at 0. The file format is a stream of JSON strings (one per dataset element) separated by newlines.
- url: a String of the general form
protocol://host/path
or/path
. See below for supported protocols - options: an Object with the following fields:
- gzip: Boolean (default false) to enable gzip compression. If compression
is enabled, files are suffixed with
.gz
- csv: Object (default undefined) with the following fields:
- header: optional String to define first line
- sep: String to define separator (default
;
)
- gzip: Boolean (default false) to enable gzip compression. If compression
is enabled, files are suffixed with
- done: an optional callback function of the form
function(error, result)
called at completion. If not provided, an ES6 promise is returned.
File protocol¶
The URL form is file://path
or simply path
where path is an absolute
pathname in the master host local file system.
Example:
sc.range(300).save('/tmp/results/').then(sc.end()); // will produce /tmp/results/0, /tmp/results/1
AWS S3 protocol¶
The URL form is s3://bucket/key
. AWS credentials must be provided by environment
variables i.e AWS_SECRET_ACCESS_KEY
, AWS_ACCESS_KEY_ID
.
Example:
sc.range(300).save('s3://myproject/mydataset', {gzip: true}).then(sc.end()); // will produce https://myproject.s3.amazonaws.com/mydataset/0.gz
Azure blob storage protocol¶
The URL form is wasb://container@user.blob.core.windows.net/blob_name
. Azure
credentials must be provided by environment variables i.e
AZURE_STORAGE_ACCOUNT
and AZURE_STORAGE_ACCESS_KEY
.
ds.sortBy(keyfunc[, ascending])¶
Returns a dataset sorted by the given keyfunc.
- keyfunc: a function of the form
function(element)
which returns a value used for comparison in the sort function and whereelement
is the next element of the dataset on whichsortBy()
operates - ascending: a boolean to set the sort direction. Default: true
Example:
sc.parallelize([4, 6, 10, 5, 1, 2, 9, 7, 3, 0]) .sortBy(a => a) .collect().then(console.log) // [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
ds.sortByKey(ascending)¶
When called on a dataset of type [k,v]
, returns a dataset of type [k,v]
sorted on k
. The optional parameter ascending is a boolean which sets
the sort direction, true by default.
Example:
sc.parallelize([['world', 2], ['cedric', 3], ['hello', 1]]) .sortByKey() .collect().then(console.log) // [['cedric', 3], ['hello', 1], ['world', 2]]
ds.stream([opt])¶
This action returns a readable stream of dataset content. The order of data and partitions is maintained.
- opt: an object with the following fields:
- end: Boolean, when true, call
sc.end()
on streamend
event. Default value: false. - gzip: Boolean, when true, enable gzip compression. Default value: false.
Example:
var s = sc.range(4).stream(); s.pipe(process.stdout); // 0 // 1 // 2 // 3
ds.subtract(other)¶
Returns a dataset containing only elements of source dataset which are not in other dataset.
Example:
var ds1 = sc.parallelize([1, 2, 3, 4, 5]); var ds2 = sc.parallelize([3, 4, 5, 6, 7]); ds1.subtract(ds2).collect().then(console.log); // [ 1, 2 ]
ds.take(num[, done])¶
This action returns an array of the num
first elements of the
source dataset. The result is passed to the done() callback if
provided, otherwise an ES6 promise is returned.
- num: positive integer Number of elements
- done: a callback of the form
function(error, result)
which is called at completion.
Example:
sc.range(5).take(2).then(console.log); // [1, 2]
ds.takeSample(withReplacement, num[, done])¶
This action returns an array with a random sample of num
elements
of the dataset, with or without replacement. The result is passed to
the done() callback if provided, otherwise an ES6 promise is returned.
- withReplacement: Boolean value, true if data must be sampled with replacement
- num: positive integer Number of elements
- done: a callback of the form
function(error, result)
which is called at completion.
Example:
sc.range(100).takeSample(4).then(console.log); // [ 18, 75, 4, 57 ]
ds.top(num[, done])¶
This action returns an array of the num
top elements of the
source dataset. The result is passed to the done() callback if
provided, otherwise an ES6 promise is returned.
- num: positive integer Number of elements
- done: a callback of the form
function(error, result)
which is called at completion.
Example:
sc.range(5).top(2).then(console.log); // [3, 4]
ds.union(other)¶
Returns a dataset that contains the union of the elements in the source dataset and the other dataset.
Example:
var ds1 = sc.parallelize([1, 2, 3, 4, 5]); var ds2 = sc.parallelize([3, 4, 5, 6, 7]); ds1.union(ds2).collect().then(console.log); // [ 1, 2, 3, 4, 5, 3, 4, 5, 6, 7 ]
ds.values()¶
When called on source dataset of type [k,v]
, returns a dataset with just
the elements v
.
Example:
sc.parallelize([[10, 'world'], [30, 3]]) .keys.collect().then(console.log); // [ 'world', 3 ]
Partitioners¶
A partitioner is an object passed to ds.partitionBy(partitioner) which places data in partitions according to a strategy, for example hash partitioning, where data having the same key are placed in the same partition, or range partitioning, where data in the same range are in the same partition. This is useful to accelerate processing, as it limits data transfers between workers during jobs.
A partition object must provide the following properties:
- numPartitions: a Number of partitions for the dataset
- getPartitionIndex: a Function of type
function(element)
which returns the partition index (comprised between 0 and numPartitions) for theelement
of the dataset on whichpartitionBy()
operates.
HashPartitioner(numPartitions)¶
Returns a partitioner object which implements hash based partitioning using a hash checksum of each element as a string.
- numPartitions: Number of partitions for this dataset
Example:
var hp = new skale.HashPartitioner(3) var dataset = sc.range(10).partitionBy(hp)
RangePartitioner(numPartitions, keyfunc, dataset)¶
Returns a partitioner object which first defines ranges by sampling the dataset and then places elements by comparing them with ranges.
- numPartitions: Number of partitions for this dataset
- keyfunc: a function of the form
function(element)
which returns a value used for comparison in the sort function and whereelement
is the next element of the dataset on whichpartitionBy()
operates - dataset: the dataset object on which
partitionBy()
operates
Example:
var dataset = sc.range(100) var rp = new skale.RangePartitioner(3, a => a, dataset) var dataset = sc.range(10).partitionBy(rp)
Environment variables¶
SKALE_HOST
: The hostname of the skale-server process in distributed mode. If unset, the master runs in standalone mode.SKALE_PORT
: The port of the skale-server process in distributed mode. Default value: "12346"SKALE_KEY
: An authentication token which may be required by the skale-server processSKALE_DEBUG
: set the debug trace level to the following values:0
: or unset: no traces1
: debug traces from master side2
: above traces plus worker traces3
: above traces plus network protocol traces (if running in distributed mode)