数据和处理管线

Scrapy框架中提供了scrapy.Item数据模型类,它负责定义和封装抓取到的结构化数据,并交由后续的数据处理管线(Item Pipeline)进行清洗、验证、存储等操作,Scrapy框架的这种封装解耦了数据爬取逻辑和数据处理逻辑,使得我们的程序结构更加清晰。这篇笔记我们详细学习数据模型类和数据处理管线相关的用法。

Item 数据模型定义

自定义的Item数据模型类需要继承scrapy.Item基类,此外我们还使用了类属性定义数据模型的字段,scrapy.Field()的作用其实类似一个占位符,告知Scrapy框架该数据模型有哪些数据字段以及它们的字段名,Scrapy中 字段的值可以是任意类型的数据。

import scrapy


class QuoteItem(scrapy.Item):
    text = scrapy.Field()
    author = scrapy.Field()
    tags = scrapy.Field()

scrapy.Item类本质上其实是实现了MutableMapping并在内部封装了dict,它的API基本也是仿照dict设计的,下面我们使用Scrapy Shell演示一些简单的用法。

使用数据模型

创建Item

下面例子使用命名参数创建Item实例。

quote = QuoteItem(text='Hello, world!', author='Gacfox', tags=['thinking', 'world'])

此外我们也可以基于dict来创建Item实例。

quote = QuoteItem({'text': 'Hello, world!', 'author': 'Gacfox', 'tags': ['thinking', 'world']})

获取字段值

从Item实例中获取字段值的用法和从dict获取值类似,我们可以使用[]操作符也可以使用get()方法。不过我个人习惯上一般来说更推荐使用get()方法,它在键不存在时会返回None,而前者会抛出键不存在异常,此外get()方法还支持设置默认值。

quote['text']
quote.get('text')
quote.get('price', 'undefined key')

下面例子写法用于判断数据模型中是否包含某个字段,这里fields是一个类属性,因此我们可以直接通过类名访问。

'text' in QuoteItem.fields

单独设置字段值

在Item实例上我们可以使用如下方法单独设置某一个字段的值。

quote['text'] = 'Hello, world!'

获取键集合与值集合

keys()方法和items()方法分别用于返回Item的键列表和键值对列表。

quote.keys()
quote.items()

其它常见用法

下面例子实现了复制Item实例。

quote2 = quote.copy()

下面例子将Item转为dict

dict(quote)

在Spider中使用数据模型

通过前面学习我们已经掌握了数据模型的定义和各种操作方法,这里我们继续看一个例子,下面代码中我们在Spider内部抓取网页数据,并通过生成器yield返回了数据模型的实例对象。

tutorial/items.py

import scrapy


class QuoteItem(scrapy.Item):
    text = scrapy.Field()
    author = scrapy.Field()
    tags = scrapy.Field()

tutorial/spiders/quote_spider.py

import scrapy

from tutorial.items import QuoteItem


class QuotesSpider(scrapy.Spider):
    name = 'quotes'
    start_urls = [
        'https://quotes.toscrape.com/page/1/',
    ]

    def parse(self, response, **kwargs):
        for quote in response.css('div.quote'):
            text = quote.css('span.text::text').get()
            author = quote.css('small.author::text').get()
            tags = quote.css('div.tags a.tag::text').getall()
            result = QuoteItem(text=text, author=author, tags=tags)
            yield result

        next_page = response.css('li.next a::attr(href)').get()
        if next_page is not None:
            yield response.follow(next_page, callback=self.parse)

Spider中生成的数据模型实例最终会交由数据处理管线进行加工或存储,这部分我们后文马上就会介绍到。另外一点我们要注意的是,在Spider内部我们不应编写任何阻塞式IO的代码,Scrapy框架本身是基于Twisted实现的,所有IO操作都是异步的,事件循环负责调度回调函数使多个网络请求可以被并发处理,而如果你在Spider组件中编写阻塞式IO代码会阻塞整个事件循环,降低程序的并发效率。

当然,Scrapy在网络中通常充当客户端角色而非服务端,偶尔编写一些错误的阻塞式代码可能造成的影响并不明显,但我们要知道有这样一回事,并在编码中尽量遵循最佳实践。

数据处理管线

前面我们学习了Item数据模型的定义并演示了如何在Spider中返回数据模型实例,这里我们继续学习如何对这些数据进行后处理。Scrapy中,数据处理是由Item Pipeline数据处理管线实现的,多个处理方法可以按顺序组合成链,这也是它被称为“Pipeline(管线)”的原因。Item Pipeline主要用于数据清洗、验证、存储、触发外部API等场景。

定义Item Pipeline例子

这里我们看一个例子,下面代码中我们定义了一个叫QuotePipeline的类,管线类必须包含名为process_item()的方法,这里其内部逻辑比较简单,就是使用日志打印收到的数据。

tutorial/pipelines.py

import logging


class QuotePipeline(object):
    def __init__(self):
        self.log = logging.getLogger(__name__)

    def process_item(self, item, spider):
        self.log.info(f'Processing {item}')
        return item

单纯定义管线类还不够,我们还需要在settings.py中配置管线类,配置的信息包括类名和优先级。优先级是一个整数,数字越小优先级越高(越先执行)。

tutorial/settings.py

ITEM_PIPELINES = {
   'tutorial.pipelines.QuotePipeline': 300,
}

settings.py中配置的管线类是全局生效的,也就是说如果我们有多个Spider,每个Spider产生的Item数据都会在所有全局管线中处理一遍。如果我们需要为某个Spider配置特定的管线类,可以在Spider中单独配置。

class QuotesSpider(scrapy.Spider):
    name = 'quotes'
    custom_settings = {
        'ITEM_PIPELINES': {
            'tutorial.pipelines.QuotePipeline': 300,
        }
    }
    # ... 其它配置和具体执行逻辑

Item Pipeline类详解

虽然Item Pipeline类是一个继承object的普通类,但一个标准的Item Pipeline类其实可以定义一些生命周期钩子方法,我们将其设置为指定的方法名,它们就会在恰当的时候被回调。

class DemoPipeline(object):
    def __init__(self):
        # 类构造方法

    @classmethod
    def from_crawler(cls, crawler):
        # 可以用来访问Scrapy框架核心组件的类方法
        return cls()

    def open_spider(self, spider):
        # 爬虫启动时回调,可以用于初始化资源

    def close_spider(self, spider):
        # 爬虫关闭时回调,可以用于回收资源

    def process_item(self, item, spider):
        # 每次收到Item实例时回调,用于具体处理数据
        return item

from_crawler():这个类方法能够访问Scrapy框架核心组件,参数中cls表示这个类本身,crawler是框架核心对象。这个方法的一个最常见的用途就是从crawler.settings读取settings.py中的配置并传入Item Pipeline类的实例。Pipeline Item类的__init__()其实可以包含构造参数,此时我们用形如return cls(config=xxx)的写法即可传入这些配置信息。

open_spider()和close_spider():分别用来在爬虫初始化和执行完成后回调。当发生断点续爬时,close_spider()open_spider()也会在爬虫关闭和重新开始运行时被回调。

process_item():具体处理数据的回调方法,参数item正是Item数据模型对象。

数据修改或丢弃

上面代码我们在process_item()方法内处理完成后又返回了数据模型实例,这样后续的管线还会再次收到数据并进行处理,实际上process_item()方法内也可以修改或丢弃数据,通过修改和丢弃数据我们可以实现数据的初步清洗,如果想丢弃数据我们只需使用raise抛出DropItem异常即可。

def process_item(self, item, spider):
    raise DropItem(f'Removed: {item}')

持久化数据

在Scrapy中,持久化抓取到的数据需要我们自己在Item Pipeline中实现,我们可以选择传统关系型数据库配合ORM框架或原生SQL语句来存储,也可以采用MongoDB等文档型数据库,或者更简单的直接存储为JSON或JSONL文件,实际开发中需要根据我们的数据规模和后续使用方式进行合理选择,这里我们选择使用SQLAlchemy存储数据到MySQL数据库作为例子。我们最终存储到MySQL里的表结构如下,其中格言表t_quote和标签表t_tag具有多对多关系。

CREATE TABLE `t_quote` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `text` varchar(255) NOT NULL,
  `author` varchar(50) NOT NULL,
  PRIMARY KEY (`id`)
);
CREATE TABLE `t_tag` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `name` varchar(50) NOT NULL,
  PRIMARY KEY (`id`)
);
CREATE TABLE `t_quote_tag` (
  `quote_id` bigint NOT NULL,
  `tag_id` bigint NOT NULL,
  PRIMARY KEY (`quote_id`,`tag_id`)
);

虽然Scrapy框架本身使用了一套Item数据模型,但它还无法直接对应到关系型数据库的数据模型上,这里我们首先需要定义SQLAlchemy数据模型。

tutorial/models.py

from typing import List

from sqlalchemy import BigInteger, String, Table, Column, ForeignKey
from sqlalchemy.orm import mapped_column, DeclarativeBase, Mapped, relationship


class Base(DeclarativeBase):
    pass


quote_tag = Table(
    't_quote_tag',
    Base.metadata,
    Column('quote_id', ForeignKey('t_quote.id'), primary_key=True),
    Column('tag_id', ForeignKey('t_tag.id'), primary_key=True),
)


class Quote(Base):
    __tablename__ = 't_quote'

    id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
    text: Mapped[str] = mapped_column(String(255), nullable=False)
    author: Mapped[str] = mapped_column(String(50), nullable=False)
    tags: Mapped[List['Tag']] = relationship(secondary=quote_tag)

    def __repr__(self):
        return f"Quote(id={self.id}, text='{self.text}', author='{self.author}')"


class Tag(Base):
    __tablename__ = 't_tag'

    id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
    name: Mapped[str] = mapped_column(String(50), nullable=False)

    def __repr__(self):
        return f"Tag(id={self.id}, name='{self.name}')"

接下来是存储数据的管线类,我们这里在类的open_spider方法中初始化了操作数据库需要的数据库引擎和会话工厂对象,close_spider()方法中我们安全的关闭数据库引擎对象,在具体处理数据的process_item()方法中我们实现了保存标签和格言对象的逻辑。创建数据管线类后别忘了在settings.py中配置并设置一个合理的执行顺序。

tutorial/pipelines.py

import logging

from sqlalchemy import create_engine, select
from sqlalchemy.orm import sessionmaker

from tutorial.models import Quote, Tag, Base


class SaveQuotePipeline(object):
    def __init__(self, db_conn_str):
        self.log = logging.getLogger(__name__)
        self.db_conn_str = db_conn_str
        self.engine = None
        self.Session = None

    @classmethod
    def from_crawler(cls, crawler):
        db_conn_str = crawler.settings.get('DATABASE_URL')
        return cls(db_conn_str=db_conn_str)

    def open_spider(self, spider):
        self.engine = create_engine(self.db_conn_str, echo=True, future=True)
        self.Session = sessionmaker(autocommit=False, autoflush=False, bind=self.engine)
        Base.metadata.create_all(self.engine)

    def close_spider(self, spider):
        self.engine.dispose()

    def process_item(self, item, spider):
        with self.Session() as session:
            with session.begin():
                tags = []
                for tag_str in item.get('tags', []):
                    if tag_str is not None:
                        stmt = select(Tag).where(Tag.name == tag_str)
                        tag = session.scalars(stmt).first()
                        if tag is None:
                            tag = Tag(name=tag_str)
                            session.add(tag)
                        tags.append(tag)
                quote = Quote(text=item.get('text'), author=item.get('author'), tags=tags)
                session.add(quote)
                self.log.info(f'Saved: {item}')
        return item

代码逻辑并不复杂,我们首先查询标签表,如果标签不存在则需要新创建,随后保存格言对象。如果你对SQLAlchemy不太熟悉,可以参考软件工程/Python/SQLAlchemy章节。

Scrapy内置Item Pipeline

Scrapy框架中包含了两个内置的Item Pipeline(默认并没有开启),分别是ImagesPipelineFilesPipeline,它们可以下载图片和普通文件。如果想要使用这两个Item Pipeline需要手动开启并添加存储路径设置。

ITEM_PIPELINES = {
    'scrapy.pipelines.images.ImagesPipeline': 300,
    'scrapy.pipelines.files.FilesPipeline': 400,
}

# 设置文件存储路径
IMAGES_STORE = '/path/to/images/dir'
FILES_STORE = '/path/to/files/dir'

ImagesPipelineFilesPipeline这两个Item Pipeline的内部逻辑类似,当我们的Spider生成的Item数据模型包含image_urlsfile_urls字段时就会分别触发这两个管线下载图片或文件,下载的内容最终会保存到之前设置的IMAGES_STOREFILES_STORE中。我们的Item需要具备如下定义字段。

import scrapy

class ProductItem(scrapy.Item):
    # ImagesPipeline需要的字段
    image_urls = scrapy.Field()  # 图片URL列表
    images = scrapy.Field()      # 下载后的图片信息(自动填充)

    # FilesPipeline需要的字段
    file_urls = scrapy.Field()   # 文件URL列表
    files = scrapy.Field()       # 下载后的文件信息(自动填充)

实际开发中,我们不一定会使用这两个Item Pipeline,它们的存在更像是一种参考,我们可以查阅其中的源码来实观察和学习如何自定义创建符合最佳实践的Item Pipeline。

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。