我正在尝试在新的Spliterator类上发布一些教程。如今,有很多教程都是关于从标准Ja​​va集合开始使用流的,但是我想探索使用来自网络的数据创建流的方法(因此,没有流完全由集合支持)。 >我决定实现一个来自起始地址的无限URL流。我要做的是抓取初始页面,并向客户端代码提供在该页面中找到的URL。然后,我用新的URL重复抓取。返回给客户端的URL不会重复。

UlScraper

package com.stackexchange.codereview.webscrapingwithstream;

import java.io.IOException;
import java.util.*;
import java.util.function.Consumer;
import java.util.stream.*;

import org.jsoup.Jsoup;
import org.jsoup.nodes.*;
import org.jsoup.select.Elements;

public class UrlScraper {

    private String url;
    private Set<String> index = Collections.synchronizedSet(new TreeSet<>());
    private List<String> startingReferences = new ArrayList<>();

    public UrlScraper(String url) {
        this.url = url;
    }

    public Stream<String> stream() {
        startingReferences.add(url);
        index.add(url);
        return StreamSupport.stream(new UrlSpliterator(startingReferences, index), false);
    }

    public Stream<String> parallelStream() {
        startingReferences.add(url);
        index.add(url);
        return StreamSupport.stream(new UrlSpliterator(startingReferences, index), true);
    }

    private static class UrlSpliterator implements Spliterator<String> {

        private static final int THRESHOLD = 10;
        // variable list of urls in the spliterator
        List<String> refs;
        // distinct set of already known urls 
        Set<String> index;
        // # urls returned by the stream
        int examined = 0;
        // position of last scraped url
        int scraped = -1;

        UrlSpliterator(List<String> refs, Set<String> index) {
            this.refs = refs;
            this.index = index;

        }

        @Override
        // the stream will contain distinct urls
        public int characteristics() {
            return DISTINCT | NONNULL | IMMUTABLE;
        }

        @Override
        // stream will be infinite as web
        public long estimateSize() {
            return Long.MAX_VALUE;
        }

        @Override
        public boolean tryAdvance(Consumer<? super String> consumer) {
            if (examined < refs.size()) {
                consumer.accept(refs.get(examined));

                // when under threshold, a new url is scraped 
                // to fill the list     
                if (refs.size() - examined < THRESHOLD) {
                    scraped++;
                    analyze(refs.get(scraped));
                }

                examined++;
                return true;
            } else {
                return false;
            }
        }

        @Override
        public Spliterator<String> trySplit() {
            int n = refs.size() - examined;
            // when I have more than one url, I split the scraping
            // into two parts. New Spliterator will receive a new
            // arraylist that contains half of the urls to be
            // returned by the stream. Index will be passed as is
            // since we do not want duplicates in urls
            if (n > 1) {
                int splitPoint = examined + n / 2;

                ArrayList<String> al = new ArrayList<>();
                al.addAll(splitPoint, refs);
                for (int i = refs.size() - 1; i >= splitPoint; i--)
                    refs.remove(i);
                return new UrlSpliterator(al, index);

            }
            return null;

        }

        // this is the core of scraping, I use Jsoup to do the
        // dirty part of the job
        private void analyze(String aUrl) {
            Document doc;
            Elements links = null;
            try {

                doc = Jsoup.connect(aUrl).get();

                links = doc.select("a[href]");
                for (Element link : links) {
                    String newUrl = (String) link.attr("abs:href");
                    if (!index.contains(newUrl)) {
                        refs.add(newUrl);
                        index.add(newUrl);
                    }
                }
            } catch (IOException e) {
                ; // if a link is broken, not a problem. It can happens
                  // but I do not want to stop scraping
            }

        }

    }
}


客户端代码

UrlScraper urlScraper = new UrlScraper("http://www.wikipedia.org");
urlScraper.parallelStream()
         .forEach(System.out::println);


我的审查问题:


此代码的意图可理解吗?
我要处理多线程问题吗?
我的代码是否尊重分隔符?在characteristics中声明的属性?


评论

这是一个非常好的问题,我很高兴看到其他人也使用Java 8。不幸的是,我还不能回答这个问题,但是,一旦我完全理解了Spliterator类,我将确保重新访问它。我期待看到更多Java 8问题发布;)

#1 楼

实际上,我在这段代码中看到了一些问题。恐怕这会是一个严厉的审查,部分原因是您提到这是为教程而设计的。然后还有一些较小的问题。最关键的是:

用例

为什么不使用Iterator?由于Iterator会做这项工作,并且是传统的“交通工具”,因此您应该说明为什么Splitter是完成这项工作的正确工具。请注意,对迭代器的理解相对较好,并且使用Spliterators.spliteratorUnknownSize()从Iterator到Spliterator有明确的包装路径。明显的答案是“为什么不使用Iterator?”可以是:


它是一个教程
,用于更好地控制并行执行

在每种情况下,代码均应为“示例性”,并且应该明确说明为什么需要此代码...但是,如果答案包括“并行执行”原因,则并发最好是准确的。

forEachRemaining()
<拆分器的性能优势之一是forEachRemaining()方法,并且您选择不实施它。为什么?

队列就是队列

您的代码实际上可以归结为一个队列处理工具。 refs是一个队列,上面有内容,然后从头开始对其进行处理,然后在末尾添加内容。有时您会拆分队列。

使用队列(双端队列)。

具体地说,您可能应该使用LinkedList(这是双端队列)。

如果使用队列,则不需要examined变量,而trySplit这样的代码就变成了(这里,refs是LinkedList`):
阈值逻辑

显然没有必要在tryAdvance()中使用奇怪的逻辑来进行'scrape()`调用。

简单的东西如下:

    @Override
    public Spliterator<String> trySplit() {
        int mid = refs.size() / 2;
        if (mid < 1) {
            return null;
        }

        List<String> toclear = refs.subList(mid, refs.size());
        LinkedList<String> newrefs = new LinkedList<>(toclear);
        toclear.clear();
        return new UrlSpliterator(newrefs, index);
    }


并发错误



自定义包装器类引入了一个重大的并发错误。 UrlScraper包含整个拆分的控制信息。您提供的用例是:


        if (examined < refs.size()) {
            String url = refs.get(examined++);
            analyze(url);
            consumer.accept(url);
            return true;
        }
        return false;



通过调用stream()然后parallelStream() .....可以很容易地破坏此代码。

UrlScraper urlScraper = new UrlScraper("http://www.wikipedia.org");
urlScraper.parallelStream()
         .forEach(System.out::println);


我希望第二个流产生与第一个流相同的结果,但是第二个流将产生.....据我所知,相同的内容,再加上起始URL的额外副本。它还会重新刮擦最后的THRESHOLD页数(但假设页面没有更改,它将忽略抓取的URL)。不同的线程...


分析代码中存在竞争条件。以下内容已损坏:


 UrlScraper urlScraper = new UrlScraper("http://www.wikipedia.org");
 urlScraper.stream()
          .forEach(System.out::println);
 urlScraper.parallelStream()
          .forEach(System.out::println);



不应多次将parallelStream添加到newUrl。但是,可能有两个线程(每个线程处理一个页面,两个页面都有指向同一URL的链接)同时命中此代码。

两个线程都可以将refs的值设置为相同时间”,两个线程都会将!index.contains(newURL)添加到newUrl(即使每个线程的引用都不相同,而refs却不是。只有一个线程将成功地将newUrl添加到index

您可以利用index的返回值来利用自己的优势: Set.add()块有时会弄乱事情,但在这种情况下,它们却不会。以下代码:


            for (Element link : links) {
                String newUrl = (String) link.attr("abs:href");
                if (!index.contains(newUrl)) {
                    refs.add(newUrl);
                    index.add(newUrl);
                }
            }



应为:

             for (Element link : links) {
                 String newUrl = (String) link.attr("abs:href");
                 if (index.add(newUrl)) {
                     refs.add(newUrl);
                 }
             }


当您使用“提早返回”方法(这是一件好事...)时,您应该具有没有else块的条件。 ...代码:


        Document doc;
        Elements links = null;
        try {

            doc = Jsoup.connect(aUrl).get();

            links = doc.select("a[href]");



最好写成:

        try {
            Document doc = Jsoup.connect(aUrl).get();
            Elements links = doc.select("a[href]");


单行条件语句应仍带有大括号try ... catch。该代码:


   @Override
    public boolean tryAdvance(Consumer<? super String> consumer) {
        if (examined < refs.size()) {
            ......
            return true;
        } else {
            return false;
        }
    }



应为:

   @Override
    public boolean tryAdvance(Consumer<? super String> consumer) {
        if (examined >= refs.size()) {
            return false;
        }
        .......
        return true;
    }


分离器特性:

问题3,分隔符是否遵循在分隔符接口中指定的特征。这些是可用的特征:



{...}-可以从没有锁定控件的其他线程安全地修改源数据

CONCURRENT-没有两个值相同

DISTINCT-无法更改源

IMMUTABLE-没有值将为空

NONNULL-有序列返回结果

ORDERED-大小是有限的,并且SIZED也是准确的

estimateSize()-(也需要SORTED)序列是在某些特性上预先定义的/>
ORDERED-(也需要SUBSIZED)所有trySplit()分隔符都将被SIZED和SUBSIZED

遍历该列表,您的代码将定义SIZED。我同意DISTINCT | NONNULL | IMMUTABLEDISTINCT。这些都是“容易的”。

IMMUTABLE-可以更改源吗?线程可以更改正在爬网的页面吗?是的,我认为它不能,所以我不同意NONNULL

IMMUTABLE-分流时修改源是否安全?是的,我认为是。 CONCURRENT的文档包括:特征值,表示可以由多个线程安全地并行修改元素源(允许添加,替换和/或删除),而无需外部同步。如果是这样,则期望Spliterator拥有关于遍历期间修改的影响的文档化策略。


您的代码可以轻松地适应这一要求。您的特征应包括CONCURRENT

CONCURRENTORDEREDSORTEDSIZED不适用于您的数据。

SUBSIZEDCONCURRENT ...这些都是复杂的。它们是互斥的。如果您无法修改源数据,则无法以并行方式进行修改。如果它是并发的,那么它是可变的。

底线,您的特征应为:

评论


\ $ \ begingroup \ $
感谢您的评论。这就是我所期望的,一个艰难而公平的人。好的,阈值逻辑非常无用,我将从我的教程中将其删除。我没有看到比赛情况,很高兴您找到了比赛情况。关于forEachRemaining:我从Spliterator JavaDoc提供的示例开始,我不了解forEachRemaining与tryAdvance有何不同。而且,既然我阅读了更好的文档,就看不到如何批量处理剩余的url。
\ $ \ endgroup \ $
– Trapo
14年6月3日在20:13

\ $ \ begingroup \ $
forEachRemaming可以分批完成,其中分析了多个页面,然后“接受”它们,然后进行下一个批处理。但是,从本质上讲,此用例不是forEachRemaining()的性能优势的好例子,除了引入第二个线程进行分析,第一个线程接受(可能很慢)之外……编辑
\ $ \ endgroup \ $
–rolfl
2014年6月3日20:21



\ $ \ begingroup \ $
问题的第三点呢?
\ $ \ endgroup \ $
– Trapo
14年6月3日在21:18

\ $ \ begingroup \ $
@trapo-添加了有关特性的部分。我相信您应该具有CONCURRENT,并删除IMMUTABLE
\ $ \ endgroup \ $
–rolfl
2014年6月3日23:57

\ $ \ begingroup \ $
好吧,我不知道我是否应该同意这一点。让我解释。分离器的来源是什么?是裁判。好的,源不是一成不变的,因为分离器本身会添加数据。但是,当“元素源可以由多个线程安全地并发修改(允许添加,替换和/或删除)而无需外部同步时”,数据将被保存到未同步的LinkedList(作为您的评论)中,并且将应用CONCURRENT。
\ $ \ endgroup \ $
– Trapo
2014年6月4日5:19



#2 楼

根据我对Spliterator文档的阅读,您的代码看起来应该可以并行使用。每个URL列表仅由一个分隔符拥有,而它们都共享一组线程安全的已爬网URL。如果您担心性能,则可能要切换到ConcurrentHashMap,但是synchronizedSet对于示例代码来说就足够了。 br /> CONCURRENT:迭代过程中,某个地方的某人可能会更改Web页面。发生这种情况时,如果代码尚未抓取页面,您的代码将反映出该更改。您应该注意,此类不会检测到您已经在文档中抓取的页面的结构变化。这些网址按父级-子级顺序返回。当然,该命令有点麻烦,但这仍然是一个命令。 :)


我看到的唯一真正的问题是ORDERED列表将不受限制地增长。创建一个新列表作为未获取的URL的副本。

// when under threshold, a new url is scraped 
// to fill the list     
if (refs.size() - examined < THRESHOLD) {
    refs = new ArrayList<>(refs.subList(examined, refs.size());
    scraped = scraped - examined + 1;
    analyze(refs.get(scraped));
}