scrapy分布式调度源码及其实现过程

scrapy_redis.scheduler取代了scrapy自带的scheduler调度,scheduler实现队列、url去重、Request管理的功能, 负责调度各个spider的request请求,scheduler初始化时,通过settings文件读取queue和dupefilters的类型(一般就用上边默认的),配置queue和dupefilters使用的key(一般就是spider name加上queue或者dupefilters,这样对于同一种spider的不同实例,就会使用相同的数据块了)

scrapy_redis.scheduler源码如下:

import importlib import six from scrapy.utils.misc import load_object from . import connection, defaults # TODO: add SCRAPY_JOB support. class Scheduler(object): “””Redis-based scheduler Settings ——– SCHEDULER_PERSIST : bool (default: False) Whether to persist or clear redis queue. SCHEDULER_FLUSH_ON_START : bool (default: False) Whether to flush redis queue on start. SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0) How many seconds to wait before closing if no message is received. SCHEDULER_QUEUE_KEY : str Scheduler redis key. SCHEDULER_QUEUE_CLASS : str Scheduler queue class. SCHEDULER_DUPEFILTER_KEY : str Scheduler dupefilter redis key. SCHEDULER_DUPEFILTER_CLASS : str Scheduler dupefilter class. SCHEDULER_SERIALIZER : str Scheduler serializer. “”” def __init__(self, server, persist=False, flush_on_start=False, queue_key=defaults.SCHEDULER_QUEUE_KEY, queue_cls=defaults.SCHEDULER_QUEUE_CLASS, dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY, dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS, idle_before_close=0, serializer=None): “””Initialize scheduler. Parameters ———- server : Redis The redis server instance. persist : bool Whether to flush requests when closing. Default is False. flush_on_start : bool Whether to flush requests on start. Default is False. queue_key : str Requests queue key. queue_cls : str Importable path to the queue class. dupefilter_key : str Duplicates filter key. dupefilter_cls : str Importable path to the dupefilter class. idle_before_close : int Timeout before giving up. “”” if idle_before_close < 0: raise TypeError(“idle_before_close cannot be negative”) self.server = server self.persist = persist self.flush_on_start = flush_on_start self.queue_key = queue_key self.queue_cls = queue_cls self.dupefilter_cls = dupefilter_cls self.dupefilter_key = dupefilter_key self.idle_before_close = idle_before_close self.serializer = serializer self.stats = None def __len__(self): return len(self.queue) @classmethod def from_settings(cls, settings): kwargs = { persist: settings.getbool(SCHEDULER_PERSIST), flush_on_start: settings.getbool(SCHEDULER_FLUSH_ON_START), idle_before_close: settings.getint(SCHEDULER_IDLE_BEFORE_CLOSE), } # If these values are missing, it means we want to use the defaults. optional = { # TODO: Use custom prefixes for this settings to note that are # specific to scrapy-redis. queue_key: SCHEDULER_QUEUE_KEY, queue_cls: SCHEDULER_QUEUE_CLASS, dupefilter_key: SCHEDULER_DUPEFILTER_KEY, # We use the default setting name to keep compatibility. dupefilter_cls: DUPEFILTER_CLASS, serializer: SCHEDULER_SERIALIZER, } for name, setting_name in optional.items(): val = settings.get(setting_name) if val: kwargs[name] = val # Support serializer as a path to a module. if isinstance(kwargs.get(serializer), six.string_types): kwargs[serializer] = importlib.import_module(kwargs[serializer]) server = connection.from_settings(settings) # Ensure the connection is working. server.ping() return cls(server=server, **kwargs) @classmethod def from_crawler(cls, crawler): instance = cls.from_settings(crawler.settings) # FIXME: for now, stats are only supported from this constructor instance.stats = crawler.stats return instance def open(self, spider): self.spider = spider try: self.queue = load_object(self.queue_cls)( server=self.server, spider=spider, key=self.queue_key % {spider: spider.name}, serializer=self.serializer, ) except TypeError as e: raise ValueError(“Failed to instantiate queue class %s: %s”, self.queue_cls, e) try: self.df = load_object(self.dupefilter_cls)( server=self.server, key=self.dupefilter_key % {spider: spider.name}, debug=spider.settings.getbool(DUPEFILTER_DEBUG), ) except TypeError as e: raise ValueError(“Failed to instantiate dupefilter class %s: %s”, self.dupefilter_cls, e) if self.flush_on_start: self.flush() # notice if there are requests already in the queue to resume the crawl if len(self.queue): spider.log(“Resuming crawl (%d requests scheduled)” % len(self.queue)) def close(self, reason): if not self.persist: self.flush() def flush(self): self.df.clear() self.queue.clear() def enqueue_request(self, request): if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False if self.stats: self.stats.inc_value(scheduler/enqueued/redis, spider=self.spider) self.queue.push(request) return True def next_request(self): block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout) if request and self.stats: self.stats.inc_value(scheduler/dequeued/redis, spider=self.spider) return request def has_pending_requests(self): return len(self) > 0

在爬虫开始运行时,读取配置文件配置,并创建Request队列对象和Request的url去重对象。

def open(self, spider): self.spider = spider try: self.queue = load_object(self.queue_cls)( server=self.server, spider=spider, key=self.queue_key % {spider: spider.name}, serializer=self.serializer, ) except TypeError as e: raise ValueError(“Failed to instantiate queue class %s: %s”, self.queue_cls, e) try: self.df = load_object(self.dupefilter_cls)( server=self.server, key=self.dupefilter_key % {spider: spider.name}, debug=spider.settings.getbool(DUPEFILTER_DEBUG), ) except TypeError as e: raise ValueError(“Failed to instantiate dupefilter class %s: %s”, self.dupefilter_cls, e) if self.flush_on_start: self.flush() # notice if there are requests already in the queue to resume the crawl if len(self.queue): spider.log(“Resuming crawl (%d requests scheduled)” % len(self.queue))

调度器实现了两个重要的方法,一个是入队列一个是出队列;当一个Request提交给调度器后,会根据是启用重复下载来去重,然后放入队列中:

def enqueue_request(self, request): if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False if self.stats: self.stats.inc_value(scheduler/enqueued/redis, spider=self.spider) self.queue.push(request) return True

另一个重要方法是出队列next_request,当调度完成时,next_request被调用,scheduler就通过queue容器的接口,取出一个request,把他发送给相应的spider,让spider进行爬取工作。

def next_request(self): block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout) if request and self.stats: self.stats.inc_value(scheduler/dequeued/redis, spider=self.spider) return request

当爬虫关闭时,会根据persist参数来决定是否清空队列,该参数说明:persist : bool Whether to flush requests when closing. Default is False.默认是False,但往往在配置文件中配置SCHEDULER_PERSIST = True(不清空)

def close(self, reason): if not self.persist: self.flush() def flush(self): self.df.clear() self.queue.clear()

© 版权声明
THE END
喜欢就支持一下吧
点赞10 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片