scrapy-twisted分析

scrapy-twisted分析

    1. General
    1. twisted异步框架
    • 2.1. reactor设计模式
    • 2.2. Deferred延迟对象
    • 2.3. @defer.inlineCallbacks的作用

1. General

scrapy 底层采用异步网络框架twisted来实现的,这一章主要讲解reactor defferd defferList @defer.inlineCallbacks等一些列异步开发api在scrapy中的应用

2. twisted异步框架

2.1. reactor设计模式

暂时不做详细讲解,研究中

2.2. Deferred延迟对象

API: https://twistedmatrix.com/documents/10.1.0/core/howto/defer.html

  • Deferred 底层数据结构是[tuple()],如
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def addCallbacks(self, callback, errback=None,
callbackArgs=None, callbackKeywords=None,
errbackArgs=None, errbackKeywords=None):
"""
Add a pair of callbacks (success and error) to this L{Deferred}.

These will be executed when the 'master' callback is run.

@return: C{self}.
@rtype: a L{Deferred}
"""
assert callable(callback)
assert errback is None or callable(errback)
cbs = ((callback, callbackArgs, callbackKeywords),
(errback or (passthru), errbackArgs, errbackKeywords))
self.callbacks.append(cbs)

if self.called:
self._runCallbacks()
return self
  • addCallbacks是核心,向callbacks连表添加回调对象。addCallback->tuple(callback,passthru),addErrback->(passthru,errback),addBoth->(callback,errback)都会调用addCallbacks。只是addCallbacks中参数不一样.来看下_runCallbacks函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 核心逻辑
def _runCallbacks(self):
...忽略...
# 此处的self 是defferList连表
chain = [self]
while chain:
# 把list当作栈来使用 ,取出一个defferd对象
current = chain[-1]
...忽略...
while current.callbacks:
# 获取其中的callbacks 列表
item = current.callbacks.pop(0)
callback, args, kw = item[
isinstance(current.result, failure.Failure)]
args = args or ()
kw = kw or {}
try:
current._runningCallbacks = True
try:
# callback回调,传入上一次回调结果,保存当前结果
current.result = callback(current.result, *args, **kw)
...忽略...
finally:
current._runningCallbacks = False
except:
...忽略...
else:
...忽略...
if finished:
...忽略...
# defferd完成,移除[-1]最后一个对象
chain.pop()

2.3. @defer.inlineCallbacks的作用

@defer.inlineCallbacks封装成一个defferd对象然后返回,避免写defferd,callback。调用时启动生成器,判断返回内容是否是defferd对象,如果是继续递归调用当前方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@failure._extraneous
def _inlineCallbacks(result, g, status):
waiting = [True, # waiting for result?
None] # result

while 1:
try:
# Send the last result back as the result of the yield expression.
isFailure = isinstance(result, failure.Failure)
if isFailure:
result = result.throwExceptionIntoGenerator(g)
else:
result = g.send(result)
except StopIteration as e:
# fell off the end, or "return" statement
status.deferred.callback(getattr(e, "value", None))
return
except _DefGen_Return as e:
...忽略...
except:
status.deferred.errback()
return
# 如果是defferd对象,递归调用
if isinstance(result, Deferred):
# a deferred was yielded, get the result.
def gotResult(r):
if waiting[0]:
waiting[0] = False
waiting[1] = r
else:
# We are not waiting for deferred result any more
_inlineCallbacks(r, g, status)

result.addBoth(gotResult)
if waiting[0]:
# Haven't called back yet, set flag so that we get reinvoked
# and return from the loop
waiting[0] = False
status.waitingOn = result
return

result = waiting[1]

waiting[0] = True
waiting[1] = None