Package org.openrefine.runners.local.pll
Class PLL<T>
- java.lang.Object
-
- org.openrefine.runners.local.pll.PLL<T>
-
- Direct Known Subclasses:
CroppedPLL
,DropPartitionsPLL
,IndexedPLL
,InMemoryPLL
,MapPartitionsPLL
,OrderedJoinPLL
,PairPLL
,RecordPLL
,SinglePartitionPLL
,TextFilePLL
,UnionPLL
public abstract class PLL<T> extends Object
A Partitioned Lazy List (PLL) is a lazily-computed immutable container data structure to represent lists of elements.It is split into contiguous partitions, enabling efficient parallel processing. It is analogous to Spark's Resilient Distributed Datasets (RDD) in spirit, but it is not designed for distributed contexts: a PLL is local to a given JVM. This removes the need for any serialization of jobs or of shuffled data. The API offered by PLL is also more modest, since its only purpose is to fulfill the requirements of the
Grid
interface.Running Spark in standalone mode is only designed for local testing and does not remove the overhead of serialization and scheduling.
- Author:
- Antonin Delpeuch
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected static class
PLL.LastFlush
static class
PLL.PLLExecutionError
-
Field Summary
Fields Modifier and Type Field Description protected io.vavr.collection.Array<io.vavr.collection.Array<T>>
cachedPartitions
protected PLLContext
context
protected long
id
protected String
name
-
Constructor Summary
Constructors Constructor Description PLL(PLLContext context, String name)
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Deprecated Methods Modifier and Type Method Description <U> U
aggregate(U initialValue, BiFunction<U,T,U> map, BiFunction<U,U,U> combine)
Runs an associative aggregation function on the list.PLL<List<T>>
batchPartitions(int batchSize)
Groups elements by batches of the desired size, in each partition.ProgressingFuture<Void>
cacheAsync()
Loads the contents of all partitions in memory.io.vavr.collection.Array<T>
collect()
protected ProgressingFuture<io.vavr.collection.Array<io.vavr.collection.Array<T>>>
collectPartitionsAsync()
Retrieves the contents of all partitions.protected abstract CloseableIterator<T>
compute(Partition partition)
Iterate over the elements of the given partition.protected io.vavr.collection.Array<Long>
computePartitionSizes()
PLL<T>
concatenate(List<PLL<T>> others)
Concatenates other PLLs at the end of this one, resulting in a new PLL.PLL<T>
concatenate(PLL<T> other)
Concatenates another PLL at the end of this one, resulting in a new PLL.long
count()
PLL<T>
dropFirstElements(long n)
Drops the first n elements at the beginning of the collection.PLL<T>
dropLastElements(long n)
Drops the last n elements at the end of the collection.PLL<T>
filter(Predicate<? super T> filterPredicate)
Derives a new PLL by filtering the collection to only contain elements which match the supplied predicate.<U> PLL<U>
flatMap(Function<T,CloseableIterator<U>> mapFunction, String mapDescription)
Derives a new PLL by applying a map function on each element, which can return multiple elements of the new list.protected PLLContext
getContext()
long
getId()
abstract List<PLL<?>>
getParents()
Returns the PLLs that this PLL depends on, to compute its contents.abstract io.vavr.collection.Array<? extends Partition>
getPartitions()
io.vavr.collection.Array<Long>
getPartitionSizes()
Returns the number of elements in each partition.QueryTree
getQueryTree()
boolean
hasCachedPartitionSizes()
Is this PLL aware of the size of its partitions?boolean
isCached()
Are the contents of this collection loaded in memory?boolean
isEmpty()
Is this list empty?CloseableIterator<T>
iterate(Partition partition)
Iterate over the elements of the given partition.protected CloseableIterator<T>
iterateFromPartition(int partitionId)
Stream over the part of the collection that starts at given partition boundary.CloseableIterator<T>
iterator()
Returns an iterator over the listPLL<T>
limitPartitions(long limit)
Limit each partition to contain only their first N elements.<U> PLL<U>
map(Function<T,U> mapFunction, String mapDescription)
Derives a new PLL by applying a map function on each element of this list.<U> PLL<U>
mapPartitions(BiFunction<Integer,CloseableIterator<T>,CloseableIterator<U>> map, String mapDescription, boolean preservesSizes)
Maps each partition by applying an arbitrary function to it.<K,V>
PairPLL<K,V>mapToPair(Function<T,Tuple2<K,V>> mapFunction)
Deprecated.usemapToPair(Function, String)
to also provide a description of the map function applied<K,V>
PairPLL<K,V>mapToPair(Function<T,Tuple2<K,V>> mapFunction, String mapDescription)
Maps this collection to an indexed PLL.int
numPartitions()
PLL<T>
retainPartitions(List<Integer> partitionIds)
Only retain partitions designated by the given list of indices.<U> io.vavr.collection.Array<U>
runOnPartitions(Function<Partition,U> partitionFunction, int maxConcurrency)
Runs a task in parallel on all partitions.protected <U> io.vavr.collection.Array<U>
runOnPartitions(Function<Partition,U> partitionFunction, io.vavr.collection.Iterator<? extends Partition> partitions, int maxConcurrency)
Run a task in parallel on a selection of partitions.<U> ProgressingFuture<io.vavr.collection.Array<U>>
runOnPartitionsAsync(BiFunction<Partition,TaskSignalling,U> partitionFunction, int maxConcurrency)
Runs a task in parallel on all partitions, asynchronously.protected <U> ProgressingFuture<io.vavr.collection.Array<U>>
runOnPartitionsAsync(BiFunction<Partition,TaskSignalling,U> partitionFunction, io.vavr.collection.Iterator<? extends Partition> partitions, int maxConcurrency)
Run a task in parallel on a selection of partitions, asynchronously.<U> io.vavr.collection.Array<U>
runOnPartitionsWithoutInterruption(Function<Partition,U> partitionFunction)
Same asrunOnPartitions(Function, int)
but wrapping anyInterruptedException
in an uncheckedPLL.PLLExecutionError
.protected <U> io.vavr.collection.Array<U>
runOnPartitionsWithoutInterruption(Function<Partition,U> partitionFunction, io.vavr.collection.Iterator<? extends Partition> partitions)
Same asrunOnPartitions(Function, Iterator, int)
but wrapping anyInterruptedException
as an uncheckedPLL.PLLExecutionError
.void
saveAsTextFile(String path, int maxConcurrency, boolean repartition, boolean flushRegularly)
Write the PLL to a directory, containing one file for each partition.ProgressingFuture<Void>
saveAsTextFileAsync(String path, int maxConcurrency, boolean repartition, boolean flushRegularly)
Write the PLL to a directory, writing one file per partition, in an asynchronous way.<S,U>
PLL<U>scanMap(S initialState, Function<T,S> feed, BiFunction<S,S,S> combine, BiFunction<S,T,U> map)
Applies a map function on the list, such that the map function is able to keep a state from one element to the other.protected static <T,S,U>
CloseableIterator<U>scanMapStream(CloseableIterator<T> iterator, S initialState, Function<T,S> feed, BiFunction<S,S,S> combine, BiFunction<S,T,U> map)
PLL<T>
sort(Comparator<T> comparator)
Sorts the collection using the supplied comparator.io.vavr.collection.List<T>
take(int num)
Returns the n first elements of the list (or less if there are less elements in the list).String
toString()
void
uncache()
Unloads the partition contents from memoryPLL<T>
withCachedPartitionSizes(io.vavr.collection.Array<Long> partitionSizes)
Sets the partition sizes if they are already known by the user.protected void
writeOriginalPartition(Partition partition, File directory, Optional<TaskSignalling> taskSignalling, boolean flushRegularly)
protected void
writePartition(int partitionIndex, CloseableIterator<T> iterator, File partFile, Optional<TaskSignalling> taskSignalling, boolean flushRegularly)
protected void
writePlannedPartition(org.openrefine.runners.local.pll.PLL.PlannedPartition partition, File directory, io.vavr.collection.Iterator<T> choppedIterator, Optional<TaskSignalling> taskSignalling, boolean flushRegularly)
PairPLL<Long,T>
zipWithIndex()
Indexes the collection in sequential order.
-
-
-
Field Detail
-
context
protected final PLLContext context
-
id
protected final long id
-
name
protected final String name
-
cachedPartitions
protected io.vavr.collection.Array<io.vavr.collection.Array<T>> cachedPartitions
-
-
Constructor Detail
-
PLL
public PLL(PLLContext context, String name)
-
-
Method Detail
-
compute
protected abstract CloseableIterator<T> compute(Partition partition)
Iterate over the elements of the given partition. This is the method that should be implemented by subclasses. As this method forces computation, ignoring any caching, consumers should not call it directly but rather useiterate(Partition)
. Once the iterator is not needed anymore, it should be closed. This makes it possible to release the underlying resources supporting it, such as open files or sockets.- Parameters:
partition
- the partition to iterate over- Returns:
-
numPartitions
public int numPartitions()
- Returns:
- the number of partitions in this list
-
getPartitions
public abstract io.vavr.collection.Array<? extends Partition> getPartitions()
- Returns:
- the partitions in this list
-
iterate
public CloseableIterator<T> iterate(Partition partition)
Iterate over the elements of the given partition. If the contents of this PLL have been cached, this will iterate from the cache instead.- Parameters:
partition
- the partition to iterate over- Returns:
-
count
public long count()
- Returns:
- the total number of elements
-
getPartitionSizes
public final io.vavr.collection.Array<Long> getPartitionSizes()
Returns the number of elements in each partition. SeehasCachedPartitionSizes()
to check if those sizes are already computed. Subclasses should rather overridecomputePartitionSizes()
if they can do this computation more efficiently than by iterating over the partitions.
-
computePartitionSizes
protected io.vavr.collection.Array<Long> computePartitionSizes()
-
collect
public io.vavr.collection.Array<T> collect()
- Returns:
- the list of all elements in the list, retrieved in memory.
-
collectPartitionsAsync
protected ProgressingFuture<io.vavr.collection.Array<io.vavr.collection.Array<T>>> collectPartitionsAsync()
Retrieves the contents of all partitions. This does not store them in the local cache, so two successive calls to this method will enumerate the contents of the PLL twice.
-
iterator
public CloseableIterator<T> iterator()
Returns an iterator over the list
-
iterateFromPartition
protected CloseableIterator<T> iterateFromPartition(int partitionId)
Stream over the part of the collection that starts at given partition boundary.- Parameters:
partitionId
- the index of the partition to start enumerating from- Returns:
-
isEmpty
public boolean isEmpty()
Is this list empty?
-
take
public io.vavr.collection.List<T> take(int num)
Returns the n first elements of the list (or less if there are less elements in the list).
-
aggregate
public <U> U aggregate(U initialValue, BiFunction<U,T,U> map, BiFunction<U,U,U> combine)
Runs an associative aggregation function on the list.- Type Parameters:
U
-- Parameters:
initialValue
- the neutral value for the combine operationmap
- a function taking the current state, a list element and returning the updated aggregationcombine
- the associative operator- Returns:
- the aggregated value over the entire list
-
map
public <U> PLL<U> map(Function<T,U> mapFunction, String mapDescription)
Derives a new PLL by applying a map function on each element of this list. The function is applied lazily, so it can be called multiple times on the same element, depending on the actions called on the returned PLL.- Type Parameters:
U
-- Parameters:
mapFunction
- the function to apply on each elementmapDescription
- a short descriptiono of the function, for debugging purposes- Returns:
-
flatMap
public <U> PLL<U> flatMap(Function<T,CloseableIterator<U>> mapFunction, String mapDescription)
Derives a new PLL by applying a map function on each element, which can return multiple elements of the new list. The function is applied lazily, so it can be called multiple times on the same element, depending on the actions called on the returned PLL.- Type Parameters:
U
-- Parameters:
mapFunction
-- Returns:
-
batchPartitions
public PLL<List<T>> batchPartitions(int batchSize)
Groups elements by batches of the desired size, in each partition. At the end of partitions, groups of smaller size may be created even if there are more elements in the following partitions.- Parameters:
batchSize
- the desired maximal size of batches- Returns:
-
mapPartitions
public <U> PLL<U> mapPartitions(BiFunction<Integer,CloseableIterator<T>,CloseableIterator<U>> map, String mapDescription, boolean preservesSizes)
Maps each partition by applying an arbitrary function to it.- Type Parameters:
U
-- Parameters:
map
- the function to apply on the stream of elements of the partitionmapDescription
- a short description of the map function, for debugging purposespreservesSizes
- whether the size of each partition will be preserved by the map- Returns:
-
scanMap
public <S,U> PLL<U> scanMap(S initialState, Function<T,S> feed, BiFunction<S,S,S> combine, BiFunction<S,T,U> map)
Applies a map function on the list, such that the map function is able to keep a state from one element to the other. This state is required to be combinable with an associative and unital function.- Type Parameters:
S
- the type of the state kept by the mapperU
- the type of elements returned by the mapper- Parameters:
initialState
- the initial statefeed
- the function to update the state after each elementcombine
- the function to combine two states, at partition boundariesmap
- the mapper itself- Returns:
-
scanMapStream
protected static <T,S,U> CloseableIterator<U> scanMapStream(CloseableIterator<T> iterator, S initialState, Function<T,S> feed, BiFunction<S,S,S> combine, BiFunction<S,T,U> map)
-
filter
public PLL<T> filter(Predicate<? super T> filterPredicate)
Derives a new PLL by filtering the collection to only contain elements which match the supplied predicate. The predicate is evaluated lazily, so it can be called multiple times on the same element, depending on the actions called on the returned PLL.- Parameters:
filterPredicate
-- Returns:
-
mapToPair
@Deprecated public <K,V> PairPLL<K,V> mapToPair(Function<T,Tuple2<K,V>> mapFunction)
Deprecated.usemapToPair(Function, String)
to also provide a description of the map function appliedMaps this collection to an indexed PLL. This does not come with any partitioner, so it is only indexed in the sense that it offers specific methods for collections of pairs.- Type Parameters:
K
-V
-- Parameters:
mapFunction
-- Returns:
-
mapToPair
public <K,V> PairPLL<K,V> mapToPair(Function<T,Tuple2<K,V>> mapFunction, String mapDescription)
Maps this collection to an indexed PLL. This does not come with any partitioner, so it is only indexed in the sense that it offers specific methods for collections of pairs.- Type Parameters:
K
-V
-- Parameters:
mapFunction
- the function to apply on each elementmapDescription
- a short description of the map function being applied, for debugging purposes- Returns:
-
zipWithIndex
public PairPLL<Long,T> zipWithIndex()
Indexes the collection in sequential order. This creates a partitioner, making it efficient to retrieve an element by index withPairPLL.get(K)
.
-
sort
public PLL<T> sort(Comparator<T> comparator)
Sorts the collection using the supplied comparator. This fetches the entire collection in memory.- Parameters:
comparator
-- Returns:
-
concatenate
public PLL<T> concatenate(PLL<T> other)
Concatenates another PLL at the end of this one, resulting in a new PLL. The new PLL has the union of the partitions of both original PLLs as partition set.- Parameters:
other
- the list of elements to add at the end of this one
-
concatenate
public PLL<T> concatenate(List<PLL<T>> others)
Concatenates other PLLs at the end of this one, resulting in a new PLL. The new PLL has the union of the partitions of all original PLLs as partition set.- Parameters:
others
- the list of PLLs to add at the end of this one
-
limitPartitions
public PLL<T> limitPartitions(long limit)
Limit each partition to contain only their first N elements.- Parameters:
limit
- the maximum number of items per partition- Returns:
-
retainPartitions
public PLL<T> retainPartitions(List<Integer> partitionIds)
Only retain partitions designated by the given list of indices. The other partitions are dropped from the PLL. The partitions in the resulting PLL are ordered according to the list of indices supplied.- Parameters:
partitionIds
- the indices of the partitions to retain
-
dropFirstElements
public PLL<T> dropFirstElements(long n)
Drops the first n elements at the beginning of the collection.- Parameters:
n
- the number of elements to remove- Returns:
-
dropLastElements
public PLL<T> dropLastElements(long n)
Drops the last n elements at the end of the collection.- Parameters:
n
- the number of elements to remove at the end
-
cacheAsync
public ProgressingFuture<Void> cacheAsync()
Loads the contents of all partitions in memory.
-
uncache
public void uncache()
Unloads the partition contents from memory
-
isCached
public boolean isCached()
Are the contents of this collection loaded in memory?
-
hasCachedPartitionSizes
public boolean hasCachedPartitionSizes()
Is this PLL aware of the size of its partitions?
-
withCachedPartitionSizes
public PLL<T> withCachedPartitionSizes(io.vavr.collection.Array<Long> partitionSizes)
Sets the partition sizes if they are already known by the user.- Parameters:
partitionSizes
-- Returns:
-
saveAsTextFile
public void saveAsTextFile(String path, int maxConcurrency, boolean repartition, boolean flushRegularly) throws IOException, InterruptedException
Write the PLL to a directory, containing one file for each partition.- Throws:
IOException
InterruptedException
-
saveAsTextFileAsync
public ProgressingFuture<Void> saveAsTextFileAsync(String path, int maxConcurrency, boolean repartition, boolean flushRegularly)
Write the PLL to a directory, writing one file per partition, in an asynchronous way.- Parameters:
path
- the directory to which to write the PLLmaxConcurrency
- the maximum number of partitions to write concurrentlyrepartition
- whether to re-arrange partitions to make them more balanced (merging small partitions together, splitting large partitions into smaller ones).flushRegularly
- whether the file should be written to disk often (at the cost of a lower compression ratio, and more disk writes)- Returns:
- a future for the saving process, which supports progress reporting and pausing.
-
writeOriginalPartition
protected void writeOriginalPartition(Partition partition, File directory, Optional<TaskSignalling> taskSignalling, boolean flushRegularly) throws IOException
- Throws:
IOException
-
writePlannedPartition
protected void writePlannedPartition(org.openrefine.runners.local.pll.PLL.PlannedPartition partition, File directory, io.vavr.collection.Iterator<T> choppedIterator, Optional<TaskSignalling> taskSignalling, boolean flushRegularly) throws IOException
- Throws:
IOException
-
writePartition
protected void writePartition(int partitionIndex, CloseableIterator<T> iterator, File partFile, Optional<TaskSignalling> taskSignalling, boolean flushRegularly) throws IOException
- Throws:
IOException
-
runOnPartitions
public <U> io.vavr.collection.Array<U> runOnPartitions(Function<Partition,U> partitionFunction, int maxConcurrency) throws InterruptedException
Runs a task in parallel on all partitions.- Type Parameters:
U
- return type of the function to be applied to all partitions- Parameters:
partitionFunction
- the function to be applied to all partitionsmaxConcurrency
- the maximum number of tasks to run in parallel. Set to 0 for no limit.- Returns:
- Throws:
InterruptedException
-
runOnPartitionsAsync
public <U> ProgressingFuture<io.vavr.collection.Array<U>> runOnPartitionsAsync(BiFunction<Partition,TaskSignalling,U> partitionFunction, int maxConcurrency)
Runs a task in parallel on all partitions, asynchronously.- Type Parameters:
U
- return type of the function to be applied to all partitions- Parameters:
partitionFunction
- the function to be applied to all partitionsmaxConcurrency
- the maximum number of tasks to run in parallel. Set to 0 for no limit.- Returns:
-
runOnPartitionsWithoutInterruption
public <U> io.vavr.collection.Array<U> runOnPartitionsWithoutInterruption(Function<Partition,U> partitionFunction)
Same asrunOnPartitions(Function, int)
but wrapping anyInterruptedException
in an uncheckedPLL.PLLExecutionError
.- Type Parameters:
U
-- Parameters:
partitionFunction
-- Returns:
-
runOnPartitions
protected <U> io.vavr.collection.Array<U> runOnPartitions(Function<Partition,U> partitionFunction, io.vavr.collection.Iterator<? extends Partition> partitions, int maxConcurrency) throws InterruptedException
Run a task in parallel on a selection of partitions.- Type Parameters:
U
- return type of the function to be applied to all partitions- Parameters:
partitionFunction
- the function to be applied to all partitionspartitions
- the partitions to apply the function onmaxConcurrency
- the maximum number of tasks to run in parallel. Set to 0 for no limit.- Returns:
- Throws:
InterruptedException
-
runOnPartitionsAsync
protected <U> ProgressingFuture<io.vavr.collection.Array<U>> runOnPartitionsAsync(BiFunction<Partition,TaskSignalling,U> partitionFunction, io.vavr.collection.Iterator<? extends Partition> partitions, int maxConcurrency)
Run a task in parallel on a selection of partitions, asynchronously.- Type Parameters:
U
- return type of the function to be applied to all partitions- Parameters:
partitionFunction
- the function to be applied to all partitionspartitions
- the partitions to apply the function onmaxConcurrency
- the maximum number of tasks to run in parallel. Set to 0 for no limit.
-
runOnPartitionsWithoutInterruption
protected <U> io.vavr.collection.Array<U> runOnPartitionsWithoutInterruption(Function<Partition,U> partitionFunction, io.vavr.collection.Iterator<? extends Partition> partitions)
Same asrunOnPartitions(Function, Iterator, int)
but wrapping anyInterruptedException
as an uncheckedPLL.PLLExecutionError
.
-
getContext
protected PLLContext getContext()
-
getId
public long getId()
- Returns:
- a numerical id for the PLL allocated by its context
-
getParents
public abstract List<PLL<?>> getParents()
Returns the PLLs that this PLL depends on, to compute its contents. This is used for debugging purposes, to display the tree of dependencies of a given PLL.- See Also:
getQueryTree()
-
getQueryTree
public QueryTree getQueryTree()
- Returns:
- a tree-based representation of the dependencies of this PLL.
-
-