简化用Java编写自定义迭代器

本文关键字:自定义 迭代器 Java | 更新日期: 2023-09-27 18:00:21

在Java中为自定义集合编写迭代器相当复杂,因为您基本上必须编写一个状态机:

public class CustomCollection<T> implements Iterable<T>
{
    private T[] data;
    private int size;
    @Override
    public Iterator<T> iterator()
    {
        return new Iterator<T>()
        {
            private int cursor = 0;
            @Override
            public boolean hasNext()
            {
                return cursor < size;
            }
            @Override
            public T next()
            {
                return data[cursor++];
            }
            @Override
            public void remove()
            {
                throw new UnsupportedOperationException();
            }
        };
    }
    // ...
}

对于比数组列表或链表更复杂的集合,正确获取这些状态机是一项艰巨的任务。事实上,C#设计团队认为编写自定义迭代器足够复杂,可以引入特殊的语言支持(yield return),让编译器构建状态机。

类似yield return的东西会出现在Java的下一个版本中吗?或者,在用Java编写自己的迭代器时,有没有什么库解决方案可以让我的生活更轻松?

简化用Java编写自定义迭代器

不,Java没有类似yield的东西。至于库,Guava有许多有用的类,可以使某些类型的迭代器易于编写:

  • AbstractIterator只需要实现一个T computeNext()方法
  • AbstractLinkedIterator要求您实现T computeNext(T previous)

AbstractIterator可用于以下用途:

return new AbstractIterator<T>() {
  private int index = 0;
  protected T computeNext() {
    return index == size ? endOfData() : data[index++];
  }
};

你也可以像Amir建议的那样使用Arrays.asList,甚至可以这样做:

private final List<T> listView = new AbstractList<T>() {
  public int size() {
    return data.length;
  }
  public T get(int index) {
    return data[index];
  }
};
public Iterator<T> iterator() {
  return listView.iterator();
}

也许我只是不理解你的问题。你能不做return Arrays.asList(data).iterator()

如果您不介意启动一个新线程,则可以使用SynchronousQueue

public class InternalIterator<T> implements Iterator<T>{
    private SynchronousQueue<T> queue = new SynchronousQueue<T>();
    private volatile boolean empty = false;
    private T current =null;
    private Object lock = new Object();
    private Runner implements Runnable{//run in deamon
        public void run(){
            //iterate and call 
            synchronized()(lock){
                try{
                    queue.offer(t);
                    lock.wait();
                }catch(InteruptedException e){
                    empty=true;
                    throw new RuntimeException(e);
                }
            } 
            //for each element to insert this will be the yield return 
            emtpy=true;
        }
    }
    public boolean hasNext(){
        if(current!=null)return true;
        while(!empty){
            if( (current=queue.poll(100,TimeUnit.MILLISECONDS))!=null){//avoid deadlock when last element is already returned but empty wasn't written yet 
                return true;
            }
        }
        return false;
    }
    public boolean next(){
        if(!hasNext())throw new NoSuchElementException();
        T tmp = current;
        current=null;
        return tmp;
    }
    public void remove(){
        throw new UnsupportedOperationException();
    }

}

Java一直提供一种机制来维护状态并在以后的时间点继续执行:线程。我的库解决方案的基本思想是让ConcurrentIterable在一个线程中生成元素,让ConcurrentIterator通过有界队列进行通信,在另一个线程使用。(这通常被称为生产者/消费者模式。)

首先,这里展示了简化的用法:

public class CustomCollection<T> extends ConcurrentIterable<T>
{
    private T[] data;
    private int size;
    @Override
    protected void provideElements()
    {
        for (int i = 0; i < size; ++i)
        {
            provideElement(data[i]);
        }
    }
    // ...
}

请注意完全没有状态机。您所要做的就是从ConcurrentIterable派生并实现方法provideElements。在这个方法中,您可以直接编写代码,为集合中的每个元素调用provideElement

有时客户端不会遍历整个集合,例如在线性搜索中。您可以通过检查iterationAborted():在检测到中止后立即停止提供元素

    @Override
    protected void provideElements()
    {
        for (int i = 0; i < size && !iterationAborted(); ++i)
        {
            provideElement(data[i]);
        }
    }

只要您不关心生成的附加元素,就可以不检查iterationAborted()。对于无限序列,必须检查iterationAborted()

生产者如何检测消费者已经停止迭代?这是通过在使用者中具有对令牌的强引用和在生产者中具有对同一令牌的弱引用来实现的。当使用者停止迭代时,令牌就有资格进行垃圾收集,它最终将对生产者不可见。从那时起,所有新元素都将被丢弃。

(如果没有这种预防措施,在某些情况下,有界队列最终可能会被填满,生产者将进入无限循环,所包含的元素将永远不会被垃圾收集。)

现在了解实现细节:

并发可交互.java

import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public abstract class ConcurrentIterable<T> implements Iterable<T>
{
    private static final int CAP = 1000;
    private final ThreadLocal<CommunicationChannel<T>> channels
    = new ThreadLocal<CommunicationChannel<T>>();
    @Override
    public Iterator<T> iterator()
    {
        BlockingQueue<Option<T>> queue = new ArrayBlockingQueue<Option<T>>(CAP);
        Object token = new Object();
        final CommunicationChannel<T> channel
        = new CommunicationChannel<T>(queue, token);
        new Thread(new Runnable()
        {
            @Override
            public void run()
            {
                channels.set(channel);
                provideElements();
                enqueueSentinel();
            }
        }).start();
        return new ConcurrentIterator<T>(queue, token);
    }
    protected abstract void provideElements();
    protected final boolean iterationAborted()
    {
        return channels.get().iterationAborted();
    }
    protected final void provideElement(T element)
    {
        enqueue(Option.some(element));
    }
    private void enqueueSentinel()
    {
        enqueue(Option.<T> none());
    }
    private void enqueue(Option<T> element)
    {
        try
        {
            while (!offer(element))
            {
                System.gc();
            }
        }
        catch (InterruptedException ignore)
        {
            ignore.printStackTrace();
        }
    }
    private boolean offer(Option<T> element) throws InterruptedException
    {
        CommunicationChannel<T> channel = channels.get();
        return channel.iterationAborted()
            || channel.queue.offer(element, 1, TimeUnit.SECONDS);
    }
}

通讯频道.java

import java.lang.ref.WeakReference;
import java.util.concurrent.BlockingQueue;
public class CommunicationChannel<T>
{
    public final BlockingQueue<Option<T>> queue;
    private final WeakReference<Object> token;
    public CommunicationChannel(BlockingQueue<Option<T>> queue, Object token)
    {
        this.queue = queue;
        this.token = new WeakReference<Object>(token);
    }
    public boolean iterationAborted()
    {
        return token.get() == null;
    }
}

ConcurrentTerator.java

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
public class ConcurrentIterator<T> implements Iterator<T>
{
    private final BlockingQueue<Option<T>> queue;
    @SuppressWarnings("unused")
    private final Object token;
    private Option<T> next;
    public ConcurrentIterator(BlockingQueue<Option<T>> queue, Object token)
    {
        this.queue = queue;
        this.token = token;
    }
    @Override
    public boolean hasNext()
    {
        if (next == null)
        {
            try
            {
                next = queue.take();
            }
            catch (InterruptedException ignore)
            {
                ignore.printStackTrace();
            }
        }
        return next.present;
    }
    @Override
    public T next()
    {
        if (!hasNext()) throw new NoSuchElementException();
        T result = next.value;
        next = null;
        return result;
    }
    @Override
    public void remove()
    {
        throw new UnsupportedOperationException();
    }
}

选项.java

public class Option<T>
{
    public final T value;
    public final boolean present;
    private Option(T value, boolean present)
    {
        this.value = value;
        this.present = present;
    }
    public static <T> Option<T> some(T value)
    {
        return new Option<T>(value, true);
    }
    @SuppressWarnings("unchecked")
    public static <T> Option<T> none()
    {
        return none;
    }
    @SuppressWarnings({ "rawtypes", "unchecked" })
    private static final Option none = new Option(null, false);
}

让我知道你的想法!