Skale API

The Skale module is the main entry point for Skale functionality. To use it, one must require('skale').

skale.context([config])

Creates and returns a new context which represents the connection to the Skale cluster, and which can be used to create datasets on that cluster. Config is an Object which defines the cluster server, with the following defaults:

{
  host: 'localhost',  // Cluster server host, settable also by SKALE_HOST env
  port: '12346'       // Cluster server port, settable also by SKALE_PORT env
}

Example:

var skale = require('skale');
var sc = skale.context();

sc.env

The sc.env property returns an object containing user environment variables to be set in workers.

To set and propagate an environment variable to all workers, assign sc.env object prior to invoking an action.

Example:

sc.env.MY_VAR = 'my_value';

sc.end()

Closes the connection to the cluster.

sc.lineStream(input_stream)

Returns a new dataset of lines of text read from input_stream Object, which is a readable stream where dataset content is read from.

The following example computes the size of a file using streams:

var stream = fs.createReadStream('data.txt', 'utf8');
sc.lineStream(stream)
  .map(s => s.length)
  .reduce((a, b) => a + b, 0)
  .then(console.log);

sc.objectStream(input_stream)

Returns a new dataset of Javascript Objects read from input_stream Object, which is a readable stream where dataset content is read from.

The following example counts the number of objects returned in an object stream using the mongodb native Javascript driver:

var cursor = db.collection('clients').find();
sc.objectStream(cursor).count().then(console.log);

sc.parallelize(array)

Returns a new dataset containing elements from the Array array.

Example:

var a = sc.parallelize(['Hello', 'World']);

sc.range(start[, end[, step]])

Returns a new dataset of integers from start to end (exclusive) increased by step (default 1) every element. If called with a single argument, the argument is interpreted as end, and start is set to 0.

sc.range(5).collect().then(console.log)
// [ 0, 1, 2, 3, 4 ]
sc.range(2, 4).collect().then(console.log)
// [ 2, 3 ]
sc.range(10, -5, -3).collect().then(console.log)
// [ 10, 7, 4, 1, -2 ]

sc.require(modules)

Sets a list of dependency modules to be deployed in workers for use by callbacks, such as mappers or reducers. Returns the context object.

  • modules: an Object of the form {name1: 'path1', ...} where name1 is the name of the variable to which the module is assigned to, and path1 a path expression as in require.resolve(path).

Under the hood, browserify is used on master side to build a bundle which is serialized and sent to workers, where It is then evaluated in global context.

Example:

// deps.js contains:
// module.export = function add3(a) {return a + 3;};

sc.require({add3: './deps.js'})
  .range(4)
  .map(a => add3)
  .collect()
  .then(console.log);
// [ 3, 4, 5, 6 ]

sc.source(size, callback[, args])

Returns a new dataset of size elements, where each element is generated by a custom function callback executed on workers.

  • size: an integer Number of elements in the dataset
  • callback: a function of the form function(index, args[, wc]) which returns the next element and with:
    • index: the index of the element in the dataset, comprised between 0 and size - 1
    • args: the same parameter args passed to source
    • wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies
  • args: user custom parameter, passed to callback
function randArray(index, len) {
  var arr = [];
  for (var i = 0; i < len; i++)
    arr.push(Math.floor(Math.random() * 100));
  return arr;
}

sc.source(3, randArray, 2).collect().then(console.log);
// [ [ 31, 85 ], [ 93, 21 ], [ 99, 58 ] ]

sc.textFile(path[, options])

Returns a new dataset of lines in file specified by path String.

  • path: a String of the general form protocol://host/path or /path, where protocol can be one of:
  • file: if path is on local filesystem
  • s3: if path relates to a repository on AWS [S3] storage system
  • wasb: if path relates to a repository on Azure blob storage system
  • options: an Object with the following fields:
  • maxFiles: a Number of maximum files to process if the path refers to a directory.
  • parquet: a Boolean to indicate that all files are in the [parquet] format. Default value is false.

if path ends by a '/' (directory separator), then the dataset will be composed of all the files in the directory. Sub-directories are not supported. Wildcard characters such as *, ?, etc, as in the Unix Shell globbing patterns are supported.

If a file name ends by '.gz', then its content will be automatically uncompressed using GZIP.

If a file name ends by '.parquet', it will automatically be processed as a [parquet].

Note: If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.

For example, the following program prints the length of a text file:

var lines = sc.textFile('data.txt');
lines.map(s => s.length).reduce((a, b) => a + b, 0).then(console.log);

Dataset methods

Dataset objects, as created initially by above skale context source functions, have the following methods, allowing either to instantiate a new dataset through a transformation, or to return results to the master program.

ds.aggregate(reducer, combiner, init[, obj][, done])

This action computes the aggregated value of the elements of the dataset using two functions reducer() and combiner(), allowing to use an arbitrary accumulator type, different from element type (as opposed to reduce() which imposes the same type for accumulator and element). The result is passed to the done() callback if provided, otherwise an ES6 promise is returned.

  • reducer: a function of the form function(acc, val[, obj[, wc]]), which returns the next value of the accumulator (which must be of the same type as acc) and with:
    • acc: the value of the accumulator, initially set to init
    • val: the value of the next element of the dataset on which aggregate() operates
    • obj: the same parameter obj passed to aggregate()
    • wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies.
  • combiner: a function of the form function(acc1, acc2[, obj]), which returns the merged value of accumulators and with:
    • acc1: the value of an accumulator, computed locally on a worker
    • acc2: the value of an other accumulator, issued by another worker
    • obj: the same parameter obj passed to aggregate()
  • init: the initial value of the accumulators that are used by reducer() and combiner(). It should be the identity element of the operation (a neutral zero value, i.e. applying it through the function should not change result).
  • obj: user provided data. Data will be passed to carrying serializable data from master to workers, obj is shared amongst mapper executions over each element of the dataset.
  • done: a callback of the form function(error, result) which is called at completion. If undefined, aggregate() returns an ES6 promise.

The following example computes the average of a dataset, avoiding a map():

sc.parallelize([3, 5, 2, 7, 4, 8])
  .aggregate((a, v) => [a[0] + v, a[1] + 1],
    (a1, a2) => [a1[0] + a2[0], a1[1] + a2[1]], [0, 0])
  .then(function(data) {
    console.log(data[0] / data[1]);
  })
// 4.8333

ds.aggregateByKey(reducer, combiner, init,[ obj])

When called on a dataset of type [k,v], returns a dataset of type [k,v] where v is the aggregated value of all elements of same key k. The aggregation is performed using two functions reducer() and combiner() allowing to use an arbitrary accumulator type, different from element type.

  • reducer: a function of the form function(acc, val[, obj[, wc]]), which returns the next value of the accumulator (which must be of the same type as acc) and with:
    • acc: the value of the accumulator, initially set to init
    • val: the value v of the next [k,v] element of the dataset on which aggregateByKey() operates
    • obj: the same parameter obj passed to aggregateByKey()
    • wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies.
  • combiner: a function of the form function(acc1, acc2[, obj]), which returns the merged value of accumulators and with:
    • acc1: the value of an accumulator, computed locally on a worker
    • acc2: the value of an other accumulator, issued by another worker
    • obj: the same parameter obj passed to aggregate()
  • init: the initial value of the accumulators that are used by reducer() and combiner(). It should be the identity element of the operation (a neutral zero value, i.e. applying it through the function should not change result).
  • obj: user provided data. Data will be passed to carrying serializable data from master to workers, obj is shared amongst mapper executions over each element of the dataset.

Example:

sc.parallelize([['hello', 1], ['hello', 1], ['world', 1]])
  .aggregateByKey((a, b) => a + b, (a, b) => a + b, 0)
  .collect()
  .then(console.log);
// [ [ 'hello', 2 ], [ 'world', 1 ] ]

ds.cartesian(other)

Returns a dataset wich contains all possible pairs [a, b] where a is in the source dataset and b is in the other dataset.

Example:

var ds1 = sc.parallelize([1, 2]);
var ds2 = sc.parallelize(['a', 'b', 'c']);
ds1.cartesian(ds2).collect().then(console.log);
// [ [ 1, 'a' ], [ 1, 'b' ], [ 1, 'c' ],
//   [ 2, 'a' ], [ 2, 'b' ], [ 2, 'c' ] ]

ds.coGroup(other)

When called on dataset of type [k,v] and [k,w], returns a dataset of type [k, [[v], [w]]], where data of both datasets share the same key.

Example:

var ds1 = sc.parallelize([[10, 1], [20, 2]]);
var ds2 = sc.parallelize([[10, 'world'], [30, 3]]);
ds1.coGroup(ds2).collect().then(console.log);
// [ [ 10, [ [ 1 ], [ 'world' ] ] ],
//   [ 20, [ [ 2 ], [] ] ],
//   [ 30, [ [], [ 3 ] ] ] ]

ds.collect([done])

This action returns the content of the dataset in form of an array. The result is passed to the done() callback if provided, otherwise an ES6 promise is returned.

  • done: a callback of the form function(error, result) which is called at completion.

Example:

sc.parallelize([1, 2, 3, 4])
  .collect(function (err, res) {
     console.log(res);
   });
// [ 1, 2, 3, 4 ]

ds.count([done])

This action computes the number of elements in the dataset. The result is passed to the done() callback if provided, otherwise an ES6 promise is returned.

  • done: a callback of the form function(error, result) which is called at completion.

Example:

sc.parallelize([10, 20, 30, 40]).count().then(console.log);
// 4

ds.countByKey([done])

When called on a dataset of type [k,v], this action computes the number of occurrences of elements for each key in a dataset of type [k,v]. It produces an array of elements of type [k,w] where w is the result count. The result is passed to the done() callback if provided, otherwise an ES6 promise is returned.

  • done: a callback of the form function(error, result) which is called at completion.

Example:

sc.parallelize([[10, 1], [20, 2], [10, 4]])
  .countByKey().then(console.log);
// [ [ 10, 2 ], [ 20, 1 ] ]

ds.countByValue([done])

This action computes the number of occurences of each element in dataset and returns an array of elements of type [v,n] where v is the element and n its number of occurrences. The result is passed to the done() callback if provided, otherwise an ES6 promise is returned.

  • done: a callback of the form function(error, result) which is called at completion.

Example:

sc.parallelize([ 1, 2, 3, 1, 3, 2, 5 ])
  .countByValue().then(console.log);
// [ [ 1, 2 ], [ 2, 2 ], [ 3, 2 ], [ 5, 1 ] ]

ds.distinct()

Returns a dataset where duplicates are removed.

Example:

sc.parallelize([ 1, 2, 3, 1, 4, 3, 5 ])
  .distinct()
  .collect().then(console.log);
// [ 1, 2, 3, 4, 5 ]

ds.filter(filter[, obj])

  • filter: a function of the form callback(element[, obj[, wc]]), returning a Boolean and where:
    • element: the next element of the dataset on which filter() operates
    • obj: the same parameter obj passed to filter()
    • wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies.
  • obj: user provided data. Data will be passed to carrying serializable data from master to workers, obj is shared amongst mapper executions over each element of the dataset

Applies the provided filter function to each element of the source dataset and returns a new dataset containing the elements that passed the test.

Example:

function filter(data, obj) { return data % obj.modulo; }

sc.parallelize([1, 2, 3, 4])
  .filter(filter, {modulo: 2})
  .collect().then(console.log);
// [ 1, 3 ]

ds.first([done])

This action computes the first element in this dataset. The result is passed to the done() callback if provided, otherwise an ES6 promise is returned.

  • done: a callback of the form function(error, result) which is called at completion.
sc.parallelize([1, 2, 3]).first().then(console.log);
// 1

ds.flatMap(flatMapper[, obj])

Applies the provided mapper function to each element of the source dataset and returns a new dataset.

  • flatMapper: a function of the form callback(element[, obj[, wc]]), returning an Array and where:
    • element: the next element of the dataset on which flatMap() operates
    • obj: the same parameter obj passed to flatMap()
    • wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies.
  • obj: user provided data. Data will be passed to carrying serializable data from master to workers, obj is shared amongst mapper executions over each element of the dataset

Example:

sc.range(5).flatMap(a => [a, a]).collect().then(console.log);
// [ 0, 0, 1, 1, 2, 2, 3, 3, 4, 4 ]

ds.flatMapValues(flatMapper[, obj])

Applies the provided flatMapper function to the value of each [key, value] element of the source dataset and return a new dataset containing elements defined as [key, mapper(value)], keeping the key unchanged for each source element.

  • flatMapper: a function of the form callback(element[, obj[, wc]]), returning an Array and where:
    • element: the value v of the next [k,v] element of the dataset on which flatMapValues() operates
    • obj: the same parameter obj passed to flatMapValues()
    • wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies.
  • obj: user provided data. Data will be passed to carrying serializable data from master to workers, obj is shared amongst mapper executions over each element of the dataset

Example:

function valueFlatMapper(data, obj) {
    var tmp = [];
    for (var i = 0; i < obj.N; i++) tmp.push(data * obj.fact);
    return tmp;
}

sc.parallelize([['hello', 1], ['world', 2]])
  .flatMapValues(valueFlatMapper, {N: 2, fact: 2})
  .collect().then(console.log);
// [ [ 'hello', 2 ], [ 'hello', 2 ], [ 'world', 4 ], [ 'world', 4 ] ]

ds.forEach(callback[, obj][, done])

This action applies a callback function on each element of the dataset. If provided, the done() callback is invoked at completion, otherwise an ES6 promise is returned.

  • callback: a function of the form function(val[, obj[, wc]]), which returns null and with:
    • val: the value of the next element of the dataset on which forEach() operates
    • obj: the same parameter obj passed to forEach()
    • wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies.
  • obj: user provided data. Data will be passed to carrying serializable data from master to workers, obj is shared amongst mapper executions over each element of the dataset
  • done: a callback of the form function(error, result) which is called at completion.

In the following example, the console.log() callback provided to forEach() is executed on workers and may be not visible:

sc.parallelize([1, 2, 3, 4])
  .forEach(console.log).then(console.log('finished'));

ds.groupByKey()

When called on a dataset of type [k,v], returns a dataset of type [k, [v]] where values with the same key are grouped.

Example:

sc.parallelize([[10, 1], [20, 2], [10, 4]])
  .groupByKey().collect().then(console.log);
// [ [ 10, [ 1, 4 ] ], [ 20, [ 2 ] ] ]

ds.intersection(other)

Returns a dataset containing only elements found in source dataset and other dataset.

Example:

var ds1 = sc.parallelize([1, 2, 3, 4, 5]);
var ds2 = sc.parallelize([3, 4, 5, 6, 7]);
ds1.intersection(ds2).collect().then(console.log); // [ 3, 4, 5 ]

ds.join(other)

When called on source dataset of type [k,v] and other dataset of type [k,w], returns a dataset of type [k, [v, w]] pairs with all pairs of elements for each key.

Example:

var ds1 = sc.parallelize([[10, 1], [20, 2]]);
var ds2 = sc.parallelize([[10, 'world'], [30, 3]]);
ds1.join(ds2).collect().then(console.log);
// [ [ 10, [ 1, 'world' ] ] ]

ds.keys()

When called on source dataset of type [k,v], returns a dataset with just the elements k.

Example:

sc.parallelize([[10, 'world'], [30, 3]])
  .keys.collect().then(console.log);
// [ 10, 30 ]

ds.leftOuterJoin(other)

When called on source dataset of type [k,v] and other dataset of type [k,w], returns a dataset of type [k, [v, w]] pairs where the key must be present in the other dataset.

Example:

var ds1 = sc.parallelize([[10, 1], [20, 2]]);
var ds2 = sc.parallelize([[10, 'world'], [30, 3]]);
ds1.leftOuterJoin(ds2).collect().then(console.log);
// [ [ 10, [ 1, 'world' ] ], [ 20, [ 2, null ] ] ]

ds.lookup(k[, done])

When called on source dataset of type [k,v], returns an array of values v for key k. The result is passed to the done() callback if provided, otherwise an ES6 promise is returned.

  • done: a callback of the form function(error, result) which is called at completion.

Example:

sc.parallelize([[10, 'world'], [20, 2], [10, 1], [30, 3]])
  .lookup(10).then(console.log);
// [ world, 1 ]

ds.map(mapper[, obj])

Applies the provided mapper function to each element of the source dataset and returns a new dataset.

  • mapper: a function of the form callback(element[, obj[, wc]]), returning an element and where:
    • element: the next element of the dataset on which map() operates
    • obj: the same parameter obj passed to map()
    • wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies.
  • obj: user provided data. Data will be passed to carrying serializable data from master to workers, obj is shared amongst mapper executions over each element of the dataset

Example:

sc.parallelize([1, 2, 3, 4])
  .map((data, obj) => data * obj.scaling, {scaling: 1.2})
  .collect().then(console.log);
// [ 1.2, 2.4, 3.6, 4.8 ]

ds.mapValues(mapper[, obj])

  • mapper: a function of the form callback(element[, obj[, wc]]), returning an element and where:
    • element: the value v of the next [k,v] element of the dataset on which mapValues() operates
    • obj: the same parameter obj passed to mapValues()
    • wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies
  • obj: user provided data. Data will be passed to carrying serializable data from master to workers, obj is shared amongst mapper executions over each element of the dataset

Applies the provided mapper function to the value of each [k,v] element of the source dataset and return a new dataset containing elements defined as [k, mapper(v)], keeping the key unchanged for each source element.

Example:

sc.parallelize([['hello', 1], ['world', 2]])
  .mapValues((a, obj) => a*obj.fact, {fact: 2})
  .collect().then(console.log);
// [ ['hello', 2], ['world', 4] ]

ds.partitionBy(partitioner)

Returns a dataset partitioned using the specified partitioner. The purpose of this transformation is not to change the dataset content, but to increase processing speed by ensuring that the elements accessed by further transfomations reside in the same partition.

Example:

var skale = require('skale');
var sc = skale.context();

sc.parallelize([['hello', 1], ['world', 1], ['hello', 2], ['world', 2], ['cedric', 3]])
  .partitionBy(new skale.HashPartitioner(3))
  .collect.then(console.log)
// [ ['world', 1], ['world', 2], ['hello', 1], ['hello', 2], ['cedric', 3] ]

ds.persist()

Returns the dataset, and persists the dataset content on disk (and in memory if available) in order to directly reuse content in further tasks.

Example:

var dataset = sc.range(100).map(a => a * a);

// First action: compute dataset
dataset.collect().then(console.log)

// Second action: reuse dataset, avoid map transform
dataset.collect().then(console.log)

ds.reduce(reducer, init[, obj][, done])

This action returns the aggregated value of the elements of the dataset using a reducer() function. The result is passed to the done() callback if provided, otherwise an ES6 promise is returned.

  • reducer: a function of the form function(acc, val[, obj[, wc]]), which returns the next value of the accumulator (which must be of the same type as acc and val) and with:
    • acc: the value of the accumulator, initially set to init
    • val: the value of the next element of the dataset on which reduce() operates
    • obj: the same parameter obj passed to reduce()
    • wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies.
  • init: the initial value of the accumulators that are used by reducer(). It should be the identity element of the operation (i.e. applying it through the function should not change result).
  • obj: user provided data. Data will be passed to carrying serializable data from master to workers, obj is shared amongst mapper executions over each element of the dataset
  • done: a callback of the form function(error, result) which is called at completion.

Example:

sc.parallelize([1, 2, 4, 8])
  .reduce((a, b) => a + b, 0)
  .then(console.log);
// 15

ds.reduceByKey(reducer, init[, obj])

  • reducer: a function of the form callback(acc,val[, obj[, wc]]), returning the next value of the accumulator (which must be of the same type as acc and val) and where:
    • acc: the value of the accumulator, initially set to init
    • val: the value v of the next [k,v] element of the dataset on which reduceByKey() operates
    • obj: the same parameter obj passed to reduceByKey()
    • wc: the worker context, a persistent object local to each worker, where user can store and access worker local dependencies.
  • init: the initial value of accumulator for each key. Will be passed to reducer.
  • obj: user provided data. Data will be passed to carrying serializable data from master to workers, obj is shared amongst mapper executions over each element of the dataset

When called on a dataset of type [k,v], returns a dataset of type [k,v] where the values of each key are aggregated using the reducer function and the init initial value.

Example:

sc.parallelize([[10, 1], [10, 2], [10, 4]])
  .reduceByKey((a,b) => a+b, 0)
  .collect().then(console.log);
// [ [10, 7] ]

ds.rightOuterJoin(other)

When called on source dataset of type [k,v] and other dataset of type [k,w], returns a dataset of type [k, [v, w]] pairs where the key must be present in the source dataset.

Example:

var ds1 = sc.parallelize([[10, 1], [20, 2]]);
var ds2 = sc.parallelize([[10, 'world'], [30, 3]]);
ds1.rightOuterJoin(ds2).collect().then(console.log);
// [ [ 10, [ 1, 'world' ] ], [ 30, [ null, 2 ] ] ]

ds.sample(withReplacement, frac)

  • withReplacement: Boolean value, true if data must be sampled with replacement
  • frac: Number value of the fraction of source dataset to return

Returns a dataset by sampling a fraction frac of source dataset, with or without replacement.

Example:

sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8])
  .sample(true, 0.5, 0)
  .collect().then(console.log);
// [ 1, 1, 3, 4, 4, 5, 7 ]

ds.save(url[, options][, done])

This action saves the content of the dataset to the destination URL. The destination is a flat directory which will contain as many files as partitions in the dataset. Files are named from partition numbers, starting at 0. The file format is a stream of JSON strings (one per dataset element) separated by newlines.

  • url: a String of the general form protocol://host/path or /path. See below for supported protocols
  • options: an Object with the following fields:
    • gzip: Boolean (default false) to enable gzip compression. If compression is enabled, files are suffixed with .gz
    • csv: Object (default undefined) with the following fields:
      • header: optional String to define first line
      • sep: String to define separator (default ;)
  • done: an optional callback function of the form function(error, result) called at completion. If not provided, an ES6 promise is returned.

File protocol

The URL form is file://path or simply path where path is an absolute pathname in the master host local file system.

Example:

sc.range(300).save('/tmp/results/').then(sc.end());
// will produce /tmp/results/0, /tmp/results/1

AWS S3 protocol

The URL form is s3://bucket/key. AWS credentials must be provided by environment variables i.e AWS_SECRET_ACCESS_KEY, AWS_ACCESS_KEY_ID.

Example:

sc.range(300).save('s3://myproject/mydataset', {gzip: true}).then(sc.end());
// will produce https://myproject.s3.amazonaws.com/mydataset/0.gz

Azure blob storage protocol

The URL form is wasb://container@user.blob.core.windows.net/blob_name. Azure credentials must be provided by environment variables i.e AZURE_STORAGE_ACCOUNT and AZURE_STORAGE_ACCESS_KEY.

ds.sortBy(keyfunc[, ascending])

Returns a dataset sorted by the given keyfunc.

  • keyfunc: a function of the form function(element) which returns a value used for comparison in the sort function and where element is the next element of the dataset on which sortBy() operates
  • ascending: a boolean to set the sort direction. Default: true

Example:

sc.parallelize([4, 6, 10, 5, 1, 2, 9, 7, 3, 0])
  .sortBy(a => a)
  .collect().then(console.log)
// [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

ds.sortByKey(ascending)

When called on a dataset of type [k,v], returns a dataset of type [k,v] sorted on k. The optional parameter ascending is a boolean which sets the sort direction, true by default.

Example:

sc.parallelize([['world', 2], ['cedric', 3], ['hello', 1]])
  .sortByKey()
  .collect().then(console.log)
// [['cedric', 3], ['hello', 1], ['world', 2]]

ds.stream([opt])

This action returns a readable stream of dataset content. The order of data and partitions is maintained.

  • opt: an object with the following fields:
  • end: Boolean, when true, call sc.end() on stream end event. Default value: false.
  • gzip: Boolean, when true, enable gzip compression. Default value: false.

Example:

var s = sc.range(4).stream();
s.pipe(process.stdout);
// 0
// 1
// 2
// 3

ds.subtract(other)

Returns a dataset containing only elements of source dataset which are not in other dataset.

Example:

var ds1 = sc.parallelize([1, 2, 3, 4, 5]);
var ds2 = sc.parallelize([3, 4, 5, 6, 7]);
ds1.subtract(ds2).collect().then(console.log);
// [ 1, 2 ]

ds.take(num[, done])

This action returns an array of the num first elements of the source dataset. The result is passed to the done() callback if provided, otherwise an ES6 promise is returned.

  • num: positive integer Number of elements
  • done: a callback of the form function(error, result) which is called at completion.

Example:

sc.range(5).take(2).then(console.log);
// [1, 2]

ds.takeSample(withReplacement, num[, done])

This action returns an array with a random sample of num elements of the dataset, with or without replacement. The result is passed to the done() callback if provided, otherwise an ES6 promise is returned.

  • withReplacement: Boolean value, true if data must be sampled with replacement
  • num: positive integer Number of elements
  • done: a callback of the form function(error, result) which is called at completion.

Example:

sc.range(100).takeSample(4).then(console.log);
// [ 18, 75, 4, 57 ]

ds.top(num[, done])

This action returns an array of the num top elements of the source dataset. The result is passed to the done() callback if provided, otherwise an ES6 promise is returned.

  • num: positive integer Number of elements
  • done: a callback of the form function(error, result) which is called at completion.

Example:

sc.range(5).top(2).then(console.log);
// [3, 4]

ds.union(other)

Returns a dataset that contains the union of the elements in the source dataset and the other dataset.

Example:

var ds1 = sc.parallelize([1, 2, 3, 4, 5]);
var ds2 = sc.parallelize([3, 4, 5, 6, 7]);
ds1.union(ds2).collect().then(console.log);
// [ 1, 2, 3, 4, 5, 3, 4, 5, 6, 7 ]

ds.values()

When called on source dataset of type [k,v], returns a dataset with just the elements v.

Example:

sc.parallelize([[10, 'world'], [30, 3]])
  .keys.collect().then(console.log);
// [ 'world', 3 ]

Partitioners

A partitioner is an object passed to ds.partitionBy(partitioner) which places data in partitions according to a strategy, for example hash partitioning, where data having the same key are placed in the same partition, or range partitioning, where data in the same range are in the same partition. This is useful to accelerate processing, as it limits data transfers between workers during jobs.

A partition object must provide the following properties:

  • numPartitions: a Number of partitions for the dataset
  • getPartitionIndex: a Function of type function(element) which returns the partition index (comprised between 0 and numPartitions) for the element of the dataset on which partitionBy() operates.

HashPartitioner(numPartitions)

Returns a partitioner object which implements hash based partitioning using a hash checksum of each element as a string.

  • numPartitions: Number of partitions for this dataset

Example:

var hp = new skale.HashPartitioner(3)
var dataset = sc.range(10).partitionBy(hp)

RangePartitioner(numPartitions, keyfunc, dataset)

Returns a partitioner object which first defines ranges by sampling the dataset and then places elements by comparing them with ranges.

  • numPartitions: Number of partitions for this dataset
  • keyfunc: a function of the form function(element) which returns a value used for comparison in the sort function and where element is the next element of the dataset on which partitionBy() operates
  • dataset: the dataset object on which partitionBy() operates

Example:

var dataset = sc.range(100)
var rp = new skale.RangePartitioner(3, a => a, dataset)
var dataset = sc.range(10).partitionBy(rp)

Environment variables

  • SKALE_HOST: The hostname of the skale-server process in distributed mode. If unset, the master runs in standalone mode.
  • SKALE_PORT: The port of the skale-server process in distributed mode. Default value: "12346"
  • SKALE_KEY: An authentication token which may be required by the skale-server process
  • SKALE_DEBUG: set the debug trace level to the following values:
  • 0: or unset: no traces
  • 1: debug traces from master side
  • 2: above traces plus worker traces
  • 3: above traces plus network protocol traces (if running in distributed mode)