Spliterator
类上发布一些教程。如今,有很多教程都是关于从标准Java集合开始使用流的,但是我想探索使用来自网络的数据创建流的方法(因此,没有流完全由集合支持)。 >我决定实现一个来自起始地址的无限URL流。我要做的是抓取初始页面,并向客户端代码提供在该页面中找到的URL。然后,我用新的URL重复抓取。返回给客户端的URL不会重复。UlScraper
package com.stackexchange.codereview.webscrapingwithstream;
import java.io.IOException;
import java.util.*;
import java.util.function.Consumer;
import java.util.stream.*;
import org.jsoup.Jsoup;
import org.jsoup.nodes.*;
import org.jsoup.select.Elements;
public class UrlScraper {
private String url;
private Set<String> index = Collections.synchronizedSet(new TreeSet<>());
private List<String> startingReferences = new ArrayList<>();
public UrlScraper(String url) {
this.url = url;
}
public Stream<String> stream() {
startingReferences.add(url);
index.add(url);
return StreamSupport.stream(new UrlSpliterator(startingReferences, index), false);
}
public Stream<String> parallelStream() {
startingReferences.add(url);
index.add(url);
return StreamSupport.stream(new UrlSpliterator(startingReferences, index), true);
}
private static class UrlSpliterator implements Spliterator<String> {
private static final int THRESHOLD = 10;
// variable list of urls in the spliterator
List<String> refs;
// distinct set of already known urls
Set<String> index;
// # urls returned by the stream
int examined = 0;
// position of last scraped url
int scraped = -1;
UrlSpliterator(List<String> refs, Set<String> index) {
this.refs = refs;
this.index = index;
}
@Override
// the stream will contain distinct urls
public int characteristics() {
return DISTINCT | NONNULL | IMMUTABLE;
}
@Override
// stream will be infinite as web
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public boolean tryAdvance(Consumer<? super String> consumer) {
if (examined < refs.size()) {
consumer.accept(refs.get(examined));
// when under threshold, a new url is scraped
// to fill the list
if (refs.size() - examined < THRESHOLD) {
scraped++;
analyze(refs.get(scraped));
}
examined++;
return true;
} else {
return false;
}
}
@Override
public Spliterator<String> trySplit() {
int n = refs.size() - examined;
// when I have more than one url, I split the scraping
// into two parts. New Spliterator will receive a new
// arraylist that contains half of the urls to be
// returned by the stream. Index will be passed as is
// since we do not want duplicates in urls
if (n > 1) {
int splitPoint = examined + n / 2;
ArrayList<String> al = new ArrayList<>();
al.addAll(splitPoint, refs);
for (int i = refs.size() - 1; i >= splitPoint; i--)
refs.remove(i);
return new UrlSpliterator(al, index);
}
return null;
}
// this is the core of scraping, I use Jsoup to do the
// dirty part of the job
private void analyze(String aUrl) {
Document doc;
Elements links = null;
try {
doc = Jsoup.connect(aUrl).get();
links = doc.select("a[href]");
for (Element link : links) {
String newUrl = (String) link.attr("abs:href");
if (!index.contains(newUrl)) {
refs.add(newUrl);
index.add(newUrl);
}
}
} catch (IOException e) {
; // if a link is broken, not a problem. It can happens
// but I do not want to stop scraping
}
}
}
}
客户端代码
UrlScraper urlScraper = new UrlScraper("http://www.wikipedia.org");
urlScraper.parallelStream()
.forEach(System.out::println);
我的审查问题:
此代码的意图可理解吗?
我要处理多线程问题吗?
我的代码是否尊重分隔符?在
characteristics
中声明的属性?#1 楼
实际上,我在这段代码中看到了一些问题。恐怕这会是一个严厉的审查,部分原因是您提到这是为教程而设计的。然后还有一些较小的问题。最关键的是:用例
为什么不使用Iterator?由于Iterator会做这项工作,并且是传统的“交通工具”,因此您应该说明为什么Splitter是完成这项工作的正确工具。请注意,对迭代器的理解相对较好,并且使用Spliterators.spliteratorUnknownSize()从Iterator到Spliterator有明确的包装路径。明显的答案是“为什么不使用Iterator?”可以是:
它是一个教程
,用于更好地控制并行执行
在每种情况下,代码均应为“示例性”,并且应该明确说明为什么需要此代码...但是,如果答案包括“并行执行”原因,则并发最好是准确的。
forEachRemaining()
<拆分器的性能优势之一是
forEachRemaining()
方法,并且您选择不实施它。为什么?队列就是队列
您的代码实际上可以归结为一个队列处理工具。
refs
是一个队列,上面有内容,然后从头开始对其进行处理,然后在末尾添加内容。有时您会拆分队列。使用队列(双端队列)。
具体地说,您可能应该使用LinkedList(这是双端队列)。
如果使用队列,则不需要
examined
变量,而trySplit这样的代码就变成了(这里,refs
是LinkedList`):阈值逻辑
显然没有必要在
tryAdvance()
中使用奇怪的逻辑来进行'scrape()`调用。简单的东西如下:
@Override
public Spliterator<String> trySplit() {
int mid = refs.size() / 2;
if (mid < 1) {
return null;
}
List<String> toclear = refs.subList(mid, refs.size());
LinkedList<String> newrefs = new LinkedList<>(toclear);
toclear.clear();
return new UrlSpliterator(newrefs, index);
}
并发错误
自定义包装器类引入了一个重大的并发错误。
UrlScraper
包含整个拆分的控制信息。您提供的用例是: if (examined < refs.size()) {
String url = refs.get(examined++);
analyze(url);
consumer.accept(url);
return true;
}
return false;
通过调用
stream()
然后parallelStream()
.....可以很容易地破坏此代码。 UrlScraper urlScraper = new UrlScraper("http://www.wikipedia.org");
urlScraper.parallelStream()
.forEach(System.out::println);
我希望第二个流产生与第一个流相同的结果,但是第二个流将产生.....据我所知,相同的内容,再加上起始URL的额外副本。它还会重新刮擦最后的THRESHOLD页数(但假设页面没有更改,它将忽略抓取的URL)。不同的线程...
分析代码中存在竞争条件。以下内容已损坏:
UrlScraper urlScraper = new UrlScraper("http://www.wikipedia.org");
urlScraper.stream()
.forEach(System.out::println);
urlScraper.parallelStream()
.forEach(System.out::println);
不应多次将
parallelStream
添加到newUrl
。但是,可能有两个线程(每个线程处理一个页面,两个页面都有指向同一URL的链接)同时命中此代码。两个线程都可以将
refs
的值设置为相同时间”,两个线程都会将!index.contains(newURL)
添加到newUrl
(即使每个线程的引用都不相同,而refs
却不是。只有一个线程将成功地将newUrl添加到index
。您可以利用
index
的返回值来利用自己的优势: Set.add()
块有时会弄乱事情,但在这种情况下,它们却不会。以下代码: for (Element link : links) {
String newUrl = (String) link.attr("abs:href");
if (!index.contains(newUrl)) {
refs.add(newUrl);
index.add(newUrl);
}
}
应为:
for (Element link : links) {
String newUrl = (String) link.attr("abs:href");
if (index.add(newUrl)) {
refs.add(newUrl);
}
}
当您使用“提早返回”方法(这是一件好事...)时,您应该具有没有else块的条件。 ...代码:
Document doc;
Elements links = null;
try {
doc = Jsoup.connect(aUrl).get();
links = doc.select("a[href]");
最好写成:
try {
Document doc = Jsoup.connect(aUrl).get();
Elements links = doc.select("a[href]");
单行条件语句应仍带有大括号
try ... catch
。该代码: @Override
public boolean tryAdvance(Consumer<? super String> consumer) {
if (examined < refs.size()) {
......
return true;
} else {
return false;
}
}
应为:
@Override
public boolean tryAdvance(Consumer<? super String> consumer) {
if (examined >= refs.size()) {
return false;
}
.......
return true;
}
分离器特性:
问题3,分隔符是否遵循在分隔符接口中指定的特征。这些是可用的特征:
{...}
-可以从没有锁定控件的其他线程安全地修改源数据CONCURRENT
-没有两个值相同DISTINCT
-无法更改源IMMUTABLE
-没有值将为空NONNULL
-有序列返回结果ORDERED
-大小是有限的,并且SIZED
也是准确的estimateSize()
-(也需要SORTED
)序列是在某些特性上预先定义的/> ORDERED
-(也需要SUBSIZED
)所有trySplit()分隔符都将被SIZED和SUBSIZED 遍历该列表,您的代码将定义
SIZED
。我同意DISTINCT | NONNULL | IMMUTABLE
和DISTINCT
。这些都是“容易的”。IMMUTABLE-可以更改源吗?线程可以更改正在爬网的页面吗?是的,我认为它不能,所以我不同意
NONNULL
。IMMUTABLE
-分流时修改源是否安全?是的,我认为是。 CONCURRENT
的文档包括:特征值,表示可以由多个线程安全地并行修改元素源(允许添加,替换和/或删除),而无需外部同步。如果是这样,则期望Spliterator拥有关于遍历期间修改的影响的文档化策略。您的代码可以轻松地适应这一要求。您的特征应包括
CONCURRENT
。CONCURRENT
,ORDERED
,SORTED
和SIZED
不适用于您的数据。SUBSIZED
和CONCURRENT
...这些都是复杂的。它们是互斥的。如果您无法修改源数据,则无法以并行方式进行修改。如果它是并发的,那么它是可变的。底线,您的特征应为:
评论
\ $ \ begingroup \ $
感谢您的评论。这就是我所期望的,一个艰难而公平的人。好的,阈值逻辑非常无用,我将从我的教程中将其删除。我没有看到比赛情况,很高兴您找到了比赛情况。关于forEachRemaining:我从Spliterator JavaDoc提供的示例开始,我不了解forEachRemaining与tryAdvance有何不同。而且,既然我阅读了更好的文档,就看不到如何批量处理剩余的url。
\ $ \ endgroup \ $
– Trapo
14年6月3日在20:13
\ $ \ begingroup \ $
forEachRemaming可以分批完成,其中分析了多个页面,然后“接受”它们,然后进行下一个批处理。但是,从本质上讲,此用例不是forEachRemaining()的性能优势的好例子,除了引入第二个线程进行分析,第一个线程接受(可能很慢)之外……编辑
\ $ \ endgroup \ $
–rolfl
2014年6月3日20:21
\ $ \ begingroup \ $
问题的第三点呢?
\ $ \ endgroup \ $
– Trapo
14年6月3日在21:18
\ $ \ begingroup \ $
@trapo-添加了有关特性的部分。我相信您应该具有CONCURRENT,并删除IMMUTABLE
\ $ \ endgroup \ $
–rolfl
2014年6月3日23:57
\ $ \ begingroup \ $
好吧,我不知道我是否应该同意这一点。让我解释。分离器的来源是什么?是裁判。好的,源不是一成不变的,因为分离器本身会添加数据。但是,当“元素源可以由多个线程安全地并发修改(允许添加,替换和/或删除)而无需外部同步时”,数据将被保存到未同步的LinkedList(作为您的评论)中,并且将应用CONCURRENT。
\ $ \ endgroup \ $
– Trapo
2014年6月4日5:19
#2 楼
根据我对Spliterator
文档的阅读,您的代码看起来应该可以并行使用。每个URL列表仅由一个分隔符拥有,而它们都共享一组线程安全的已爬网URL。如果您担心性能,则可能要切换到ConcurrentHashMap
,但是synchronizedSet
对于示例代码来说就足够了。 br /> CONCURRENT
:迭代过程中,某个地方的某人可能会更改Web页面。发生这种情况时,如果代码尚未抓取页面,您的代码将反映出该更改。您应该注意,此类不会检测到您已经在文档中抓取的页面的结构变化。这些网址按父级-子级顺序返回。当然,该命令有点麻烦,但这仍然是一个命令。 :) 我看到的唯一真正的问题是
ORDERED
列表将不受限制地增长。创建一个新列表作为未获取的URL的副本。// when under threshold, a new url is scraped
// to fill the list
if (refs.size() - examined < THRESHOLD) {
refs = new ArrayList<>(refs.subList(examined, refs.size());
scraped = scraped - examined + 1;
analyze(refs.get(scraped));
}
评论
这是一个非常好的问题,我很高兴看到其他人也使用Java 8。不幸的是,我还不能回答这个问题,但是,一旦我完全理解了Spliterator类,我将确保重新访问它。我期待看到更多Java 8问题发布;)