Class PLL<T>

  • Direct Known Subclasses:
    CroppedPLL, DropPartitionsPLL, IndexedPLL, InMemoryPLL, MapPartitionsPLL, OrderedJoinPLL, PairPLL, RecordPLL, SinglePartitionPLL, TextFilePLL, UnionPLL

    public abstract class PLL<T>
    extends Object
    A Partitioned Lazy List (PLL) is a lazily-computed immutable container data structure to represent lists of elements.

    It is split into contiguous partitions, enabling efficient parallel processing. It is analogous to Spark's Resilient Distributed Datasets (RDD) in spirit, but it is not designed for distributed contexts: a PLL is local to a given JVM. This removes the need for any serialization of jobs or of shuffled data. The API offered by PLL is also more modest, since its only purpose is to fulfill the requirements of the Grid interface.

    Running Spark in standalone mode is only designed for local testing and does not remove the overhead of serialization and scheduling.

    Author:
    Antonin Delpeuch
    • Field Detail

      • id

        protected final long id
      • name

        protected final String name
      • cachedPartitions

        protected io.vavr.collection.Array<io.vavr.collection.Array<T>> cachedPartitions
    • Method Detail

      • compute

        protected abstract CloseableIterator<T> compute​(Partition partition)
        Iterate over the elements of the given partition. This is the method that should be implemented by subclasses. As this method forces computation, ignoring any caching, consumers should not call it directly but rather use iterate(Partition). Once the iterator is not needed anymore, it should be closed. This makes it possible to release the underlying resources supporting it, such as open files or sockets.
        Parameters:
        partition - the partition to iterate over
        Returns:
      • numPartitions

        public int numPartitions()
        Returns:
        the number of partitions in this list
      • getPartitions

        public abstract io.vavr.collection.Array<? extends Partition> getPartitions()
        Returns:
        the partitions in this list
      • iterate

        public CloseableIterator<T> iterate​(Partition partition)
        Iterate over the elements of the given partition. If the contents of this PLL have been cached, this will iterate from the cache instead.
        Parameters:
        partition - the partition to iterate over
        Returns:
      • count

        public long count()
        Returns:
        the total number of elements
      • getPartitionSizes

        public final io.vavr.collection.Array<Long> getPartitionSizes()
        Returns the number of elements in each partition. See hasCachedPartitionSizes() to check if those sizes are already computed. Subclasses should rather override computePartitionSizes() if they can do this computation more efficiently than by iterating over the partitions.
      • computePartitionSizes

        protected io.vavr.collection.Array<Long> computePartitionSizes()
      • collect

        public io.vavr.collection.Array<T> collect()
        Returns:
        the list of all elements in the list, retrieved in memory.
      • collectPartitionsAsync

        protected ProgressingFuture<io.vavr.collection.Array<io.vavr.collection.Array<T>>> collectPartitionsAsync()
        Retrieves the contents of all partitions. This does not store them in the local cache, so two successive calls to this method will enumerate the contents of the PLL twice.
      • iterateFromPartition

        protected CloseableIterator<T> iterateFromPartition​(int partitionId)
        Stream over the part of the collection that starts at given partition boundary.
        Parameters:
        partitionId - the index of the partition to start enumerating from
        Returns:
      • isEmpty

        public boolean isEmpty()
        Is this list empty?
      • take

        public io.vavr.collection.List<T> take​(int num)
        Returns the n first elements of the list (or less if there are less elements in the list).
      • aggregate

        public <U> U aggregate​(U initialValue,
                               BiFunction<U,​T,​U> map,
                               BiFunction<U,​U,​U> combine)
        Runs an associative aggregation function on the list.
        Type Parameters:
        U -
        Parameters:
        initialValue - the neutral value for the combine operation
        map - a function taking the current state, a list element and returning the updated aggregation
        combine - the associative operator
        Returns:
        the aggregated value over the entire list
      • map

        public <U> PLL<U> map​(Function<T,​U> mapFunction,
                              String mapDescription)
        Derives a new PLL by applying a map function on each element of this list. The function is applied lazily, so it can be called multiple times on the same element, depending on the actions called on the returned PLL.
        Type Parameters:
        U -
        Parameters:
        mapFunction - the function to apply on each element
        mapDescription - a short descriptiono of the function, for debugging purposes
        Returns:
      • flatMap

        public <U> PLL<U> flatMap​(Function<T,​CloseableIterator<U>> mapFunction,
                                  String mapDescription)
        Derives a new PLL by applying a map function on each element, which can return multiple elements of the new list. The function is applied lazily, so it can be called multiple times on the same element, depending on the actions called on the returned PLL.
        Type Parameters:
        U -
        Parameters:
        mapFunction -
        Returns:
      • batchPartitions

        public PLL<List<T>> batchPartitions​(int batchSize)
        Groups elements by batches of the desired size, in each partition. At the end of partitions, groups of smaller size may be created even if there are more elements in the following partitions.
        Parameters:
        batchSize - the desired maximal size of batches
        Returns:
      • mapPartitions

        public <U> PLL<U> mapPartitions​(BiFunction<Integer,​CloseableIterator<T>,​CloseableIterator<U>> map,
                                        String mapDescription,
                                        boolean preservesSizes)
        Maps each partition by applying an arbitrary function to it.
        Type Parameters:
        U -
        Parameters:
        map - the function to apply on the stream of elements of the partition
        mapDescription - a short description of the map function, for debugging purposes
        preservesSizes - whether the size of each partition will be preserved by the map
        Returns:
      • scanMap

        public <S,​U> PLL<U> scanMap​(S initialState,
                                          Function<T,​S> feed,
                                          BiFunction<S,​S,​S> combine,
                                          BiFunction<S,​T,​U> map)
        Applies a map function on the list, such that the map function is able to keep a state from one element to the other. This state is required to be combinable with an associative and unital function.
        Type Parameters:
        S - the type of the state kept by the mapper
        U - the type of elements returned by the mapper
        Parameters:
        initialState - the initial state
        feed - the function to update the state after each element
        combine - the function to combine two states, at partition boundaries
        map - the mapper itself
        Returns:
      • filter

        public PLL<T> filter​(Predicate<? super T> filterPredicate)
        Derives a new PLL by filtering the collection to only contain elements which match the supplied predicate. The predicate is evaluated lazily, so it can be called multiple times on the same element, depending on the actions called on the returned PLL.
        Parameters:
        filterPredicate -
        Returns:
      • mapToPair

        @Deprecated
        public <K,​V> PairPLL<K,​V> mapToPair​(Function<T,​Tuple2<K,​V>> mapFunction)
        Deprecated.
        use mapToPair(Function, String) to also provide a description of the map function applied
        Maps this collection to an indexed PLL. This does not come with any partitioner, so it is only indexed in the sense that it offers specific methods for collections of pairs.
        Type Parameters:
        K -
        V -
        Parameters:
        mapFunction -
        Returns:
      • mapToPair

        public <K,​V> PairPLL<K,​V> mapToPair​(Function<T,​Tuple2<K,​V>> mapFunction,
                                                        String mapDescription)
        Maps this collection to an indexed PLL. This does not come with any partitioner, so it is only indexed in the sense that it offers specific methods for collections of pairs.
        Type Parameters:
        K -
        V -
        Parameters:
        mapFunction - the function to apply on each element
        mapDescription - a short description of the map function being applied, for debugging purposes
        Returns:
      • zipWithIndex

        public PairPLL<Long,​T> zipWithIndex()
        Indexes the collection in sequential order. This creates a partitioner, making it efficient to retrieve an element by index with PairPLL.get(K).
      • sort

        public PLL<T> sort​(Comparator<T> comparator)
        Sorts the collection using the supplied comparator. This fetches the entire collection in memory.
        Parameters:
        comparator -
        Returns:
      • concatenate

        public PLL<T> concatenate​(PLL<T> other)
        Concatenates another PLL at the end of this one, resulting in a new PLL. The new PLL has the union of the partitions of both original PLLs as partition set.
        Parameters:
        other - the list of elements to add at the end of this one
      • concatenate

        public PLL<T> concatenate​(List<PLL<T>> others)
        Concatenates other PLLs at the end of this one, resulting in a new PLL. The new PLL has the union of the partitions of all original PLLs as partition set.
        Parameters:
        others - the list of PLLs to add at the end of this one
      • limitPartitions

        public PLL<T> limitPartitions​(long limit)
        Limit each partition to contain only their first N elements.
        Parameters:
        limit - the maximum number of items per partition
        Returns:
      • retainPartitions

        public PLL<T> retainPartitions​(List<Integer> partitionIds)
        Only retain partitions designated by the given list of indices. The other partitions are dropped from the PLL. The partitions in the resulting PLL are ordered according to the list of indices supplied.
        Parameters:
        partitionIds - the indices of the partitions to retain
      • dropFirstElements

        public PLL<T> dropFirstElements​(long n)
        Drops the first n elements at the beginning of the collection.
        Parameters:
        n - the number of elements to remove
        Returns:
      • dropLastElements

        public PLL<T> dropLastElements​(long n)
        Drops the last n elements at the end of the collection.
        Parameters:
        n - the number of elements to remove at the end
      • cacheAsync

        public ProgressingFuture<Void> cacheAsync()
        Loads the contents of all partitions in memory.
      • uncache

        public void uncache()
        Unloads the partition contents from memory
      • isCached

        public boolean isCached()
        Are the contents of this collection loaded in memory?
      • hasCachedPartitionSizes

        public boolean hasCachedPartitionSizes()
        Is this PLL aware of the size of its partitions?
      • withCachedPartitionSizes

        public PLL<T> withCachedPartitionSizes​(io.vavr.collection.Array<Long> partitionSizes)
        Sets the partition sizes if they are already known by the user.
        Parameters:
        partitionSizes -
        Returns:
      • saveAsTextFileAsync

        public ProgressingFuture<Void> saveAsTextFileAsync​(String path,
                                                           int maxConcurrency,
                                                           boolean repartition,
                                                           boolean flushRegularly)
        Write the PLL to a directory, writing one file per partition, in an asynchronous way.
        Parameters:
        path - the directory to which to write the PLL
        maxConcurrency - the maximum number of partitions to write concurrently
        repartition - whether to re-arrange partitions to make them more balanced (merging small partitions together, splitting large partitions into smaller ones).
        flushRegularly - whether the file should be written to disk often (at the cost of a lower compression ratio, and more disk writes)
        Returns:
        a future for the saving process, which supports progress reporting and pausing.
      • writePlannedPartition

        protected void writePlannedPartition​(org.openrefine.runners.local.pll.PLL.PlannedPartition partition,
                                             File directory,
                                             io.vavr.collection.Iterator<T> choppedIterator,
                                             Optional<TaskSignalling> taskSignalling,
                                             boolean flushRegularly)
                                      throws IOException
        Throws:
        IOException
      • runOnPartitions

        public <U> io.vavr.collection.Array<U> runOnPartitions​(Function<Partition,​U> partitionFunction,
                                                               int maxConcurrency)
                                                        throws InterruptedException
        Runs a task in parallel on all partitions.
        Type Parameters:
        U - return type of the function to be applied to all partitions
        Parameters:
        partitionFunction - the function to be applied to all partitions
        maxConcurrency - the maximum number of tasks to run in parallel. Set to 0 for no limit.
        Returns:
        Throws:
        InterruptedException
      • runOnPartitionsAsync

        public <U> ProgressingFuture<io.vavr.collection.Array<U>> runOnPartitionsAsync​(BiFunction<Partition,​TaskSignalling,​U> partitionFunction,
                                                                                       int maxConcurrency)
        Runs a task in parallel on all partitions, asynchronously.
        Type Parameters:
        U - return type of the function to be applied to all partitions
        Parameters:
        partitionFunction - the function to be applied to all partitions
        maxConcurrency - the maximum number of tasks to run in parallel. Set to 0 for no limit.
        Returns:
      • runOnPartitions

        protected <U> io.vavr.collection.Array<U> runOnPartitions​(Function<Partition,​U> partitionFunction,
                                                                  io.vavr.collection.Iterator<? extends Partition> partitions,
                                                                  int maxConcurrency)
                                                           throws InterruptedException
        Run a task in parallel on a selection of partitions.
        Type Parameters:
        U - return type of the function to be applied to all partitions
        Parameters:
        partitionFunction - the function to be applied to all partitions
        partitions - the partitions to apply the function on
        maxConcurrency - the maximum number of tasks to run in parallel. Set to 0 for no limit.
        Returns:
        Throws:
        InterruptedException
      • runOnPartitionsAsync

        protected <U> ProgressingFuture<io.vavr.collection.Array<U>> runOnPartitionsAsync​(BiFunction<Partition,​TaskSignalling,​U> partitionFunction,
                                                                                          io.vavr.collection.Iterator<? extends Partition> partitions,
                                                                                          int maxConcurrency)
        Run a task in parallel on a selection of partitions, asynchronously.
        Type Parameters:
        U - return type of the function to be applied to all partitions
        Parameters:
        partitionFunction - the function to be applied to all partitions
        partitions - the partitions to apply the function on
        maxConcurrency - the maximum number of tasks to run in parallel. Set to 0 for no limit.
      • getId

        public long getId()
        Returns:
        a numerical id for the PLL allocated by its context
      • getParents

        public abstract List<PLL<?>> getParents()
        Returns the PLLs that this PLL depends on, to compute its contents. This is used for debugging purposes, to display the tree of dependencies of a given PLL.
        See Also:
        getQueryTree()
      • getQueryTree

        public QueryTree getQueryTree()
        Returns:
        a tree-based representation of the dependencies of this PLL.