pipeline相关知识
-
Redis pipeline关于 Redis pipeline 为什么需要 pipeline ? Redis 的工作过程是基于 请求/响应 模式的。正常情况下,客户端发送一个命令,等待 Redis 应答;Redis 接收到命令,处理后应答。请求发出到响应的时间叫做往返时间,即 RTT(Round Time Trip)。在这种情况下,如果需要执行大量的命令,就需要等待上一条命令应答后再执行。这中间不仅仅多了许多次 RTT,而且还频繁的调用系统 IO,发送网络请求。为了提升效率,pipeline 出现了,它允许客户端可以一次发送多条命令,而不等待上一条
-
Jenkins Pipeline 动态使用分支名,触类旁通前言 在上一篇 Jenkins 使用环境变量]中,帮助大家使用一条 Docker 命令就可以快速玩转 Jenkins,同时用最简单的方式解释了 Jenkins 中让人混乱的环境变量,本文还是接着变量说点事情 一般成熟的项目流程都会通过 Jenkins Pipeline 来做 CI 部分,在默认 Jenkins 环境配置中,Jenkins Pipeline 分为两种: Pipeline (单分支 Pipeline) Multibranch Pipeline (多分支 Pipeline) 如下图: 如果使用了多分支 Pipeline,就不会存在动态使用分支名称的问题了。如果你
-
Flink:Data Pipeline应用Apache Flink由于其广泛的特性,是开发和运行许多不同类型应用的优秀选择。Flink的特性包括对流和批处理的支持、复杂的状态管理、event-time处理语义、以及exactly-once保证。此外,Flink可以部署在各种资源管理平台上,例如Yarn、Mesos和Kubernetes,也可以作为一个standalone的集群。Flink具有高可用性,没有单点故障的情况。Flink已经被证明可以扩展到数千个核和TB级的应用,并提供高吞吐量和低延迟。 下面,我们将探讨Flink支持的最常见的应用类型:(1)Data Pipeline Applications(2)Data Analytics Applications(3)Event-driven Applications一.Data Pipeline Applications1.什么是data pipelines? ETL(抽取-转换-加载)是在存储系统间转换和移动数据的一种常用方法。通常,ETL任务会定期的触发,将
-
Netty源码阅读入门实战(六)-pipeline1 pipeline概述2 pipeline初始化双向链表结构看看其一个实现类基本数据结构组件3 添加ChannelHandler先看看用户代码用户方自定义实现即可4 未下载,失败 ing6 outBound事件的传播同理以后的过程7 异常的传播对应 tail 为当前节点的情形最佳实践在最后定义异常处理器,否则默认是 tail 节点的警告打印信息所有异常最终处理位置8 pipeline总结调用 pipeline 添加节点时,netty 会使用 instanceof 关键字判断当前节点是 inboound 还是 outbound 类型,分别用不同的 boolean 类型变量标识inbound 事件类型顺序正相关outbound 逆相关异常处理器要么从 head 或者 tail 节点开始传播inbound事件则从当前节点开始传递到最后节点outbound事件则从当前节点开始传递 到第一个 outbound节点作者:芥末无疆sss链接:https://www.jianshu.com/p/0d43166
pipeline相关课程
pipeline相关教程
- 1. Scrapy 中的 Pipeline 介绍 Pipeline 的中文意思是管道,类似于工厂的流水线那样。Scrapy 中的 Pipeline 通常是和 Items 联系在一起的,其实就是对 Items 数据的流水线处理。 一般而言,Pipeline 的典型应用场景如下:数据清洗、去重;验证数据的有效性;按照自定义格式保存数据;存储到合适的数据库中 (如 MySQL、Redis 或者 MongoDB);通过前面的 Scrapy 架构图可知,Pipeline 位于 Scrapy 数据处理流程的最后一步,但是它也不是必须,Pipeline 默认处于关闭状态。如果需要的话,我们只需要在 settings.py 中设置 ITEM_PIPELINES 属性值即可。它是一个数组值,我们可以定义多个 Item Pipeline,并且在 ITEM_PIPELINES 中设置相应 Pipeline 的优先级。这样 Scrapy 会依次处理这些 Pipelines,最后达到我们想要的效果。item 经过 pipelines 处理注意:上面的 pipeline 顺序和功能都可以任意调整,保证逻辑性即可。比如有一个去重的 pipeline 和保存到数据库的 pipeline,那么去重的 pipeline 一定要在保存数据库之前,这样保存的就是不重复的数据。
- 深入理解 Scrapy 的 Pipeline 今天我们来深入学习 Scrapy 框架 Pipeline 的工作原理。这一次我们采取一种新的学习方式:先提出疑问,然后从源码中进行解答,直到最后我们彻底搞清楚 Pipeline 的工作流程。
- 2. 如何编写自己的 Item Pipeline 编写自己的 Item Pipeline 非常简单,我们只需要编写一个简单的类,实现四个特定名称的方法即可 (部分方法非必须)。我们来简单说明下这三个方法:open_spider(spider):非必需,参数 spider 即被关闭的 Spider 对象。这个方法是 MiddlewareManager 类中的方法,在 Spider 开启时被调用,主要做一些初始化操作,如连接数据库、打开要保存的文件等;close_spider(spider):非必需,参数 spider 即被关闭的 Spider 对象。这个方法也是 MiddlewareManager 类中的方法,在 Spider 关闭时被调用,主要做一些如关闭数据库连接、关闭打开的文件等操作;from_crawler(cls, crawler):非必需,在 Spider启用时调用,且早于 open_spider() 方法。这个方法我们很少去重载,可以不用;process_item(item, spider):必须实现。该函数有两个参数,一个是表示被处理的 Item 对象,另一个是生成该 Item 的 Spider 对象。定义的 Item pipeline 会默认调用该方法对 Item 进行处理,这也是 Pipeline 的工作核心;完成这样一个 Item Pipeline 后,将该类的路径地址添加到 settings.py 中的 ITEM_PIPELINES 中即可。下图是我们一个简单项目完成的两个 pipelines。一个简单项目的 pipelines 示例
- 2.5 scrapy-redis 中的 Item Pipeline 最后我们来看 scrapy-redis 中定义的 item pipeline。前面我们在头条新闻爬虫的改造中只是在配置中添加了 scrapy-redis 中的 item pipeline,这样爬虫抓取的结果会保存到 redis 中,那么该 pipeline 是如何实现的呢?其代码位于 scrapy_redis/pipelines.py 文件中,我们来一览究竟:# 源码位置:scrapy_redis/pipelines.py# ...class RedisPipeline(object): def __init__(self, server, key=defaults.PIPELINE_KEY, serialize_func=default_serialize): """Initialize pipeline. Parameters ---------- server : StrictRedis Redis client instance. key : str Redis key where to store items. serialize_func : callable Items serializer function. """ self.server = server self.key = key self.serialize = serialize_func @classmethod def from_settings(cls, settings): params = { 'server': connection.from_settings(settings), } if settings.get('REDIS_ITEMS_KEY'): params['key'] = settings['REDIS_ITEMS_KEY'] if settings.get('REDIS_ITEMS_SERIALIZER'): params['serialize_func'] = load_object( settings['REDIS_ITEMS_SERIALIZER'] ) return cls(**params) @classmethod def from_crawler(cls, crawler): return cls.from_settings(crawler.settings) def process_item(self, item, spider): return deferToThread(self._process_item, item, spider) def _process_item(self, item, spider): key = self.item_key(item, spider) data = self.serialize(item) self.server.rpush(key, data) return item def item_key(self, item, spider): """Returns redis key based on given spider. Override this function to use a different key depending on the item and/or spider. """ return self.key % {'spider': spider.name}这段代码也是简洁明了,首先是初始化三个属性值:server:redis 客户端实例,用于对 redis 进行操作;key:结果保存到 redis 中的 key 名;serialize:指定结果序列化类;作为 scrapy 中的 pipeline,最核心的处理函数就是 process_item() 方法。在该 pipeline 中,该方法只有一条语句:deferToThread(self._process_item, item, spider)deferToThread() 方法是 Twisted 框架提供的一个方法,其含义如下:Run a function in a thread and return the result as a Deferred其实就是开启一个线程执行相应的方法,并将结果作为一个 Deferred 返回。我们并不关心这个 Deferred 是啥,在最后一部分源码篇中会介绍到,这里我们只关心处理 item 的操作是 self._process_item() 这个方法。该方法的逻辑非常简单明了:生成保存到 Redis 中的 key;将 item 值序列化以便能保存到 Redis 中;调用 redis 的 rpush() 方法将序列化结果保存到相应列表中;看完了 scrapy-redis 中 RedisPipeline 的代码,是不是知道为什么结果会保存到 redis 中了吧?就这样,我们几乎学完了 scrapy-redis 插件的全部源码,下来我们来看一看 scrapy-redis 插件的架构图,进一步理解该插件:scrapy-redis 插件架构图
- 2.1 Item Pipeline 的管理器类 还记得上一小节我们追踪 Spider 中间件的代码时,在 scrapy/core/scraper.py 中找到了 Spider 中间件处理 Spider 模块返回结果的方法,其代码内容如下:# 源码位置:scrapy/core/scraper.py# ...class Scraper: # ... def _process_spidermw_output(self, output, request, response, spider): """Process each Request/Item (given in the output parameter) returned from the given spider """ if isinstance(output, Request): # 如果spider中间件返回的是Request,则继续调用引擎去处理请求 self.crawler.engine.crawl(request=output, spider=spider) elif is_item(output): # 如果spider中间件返回的是item,则调用self.itemproc对象的process_item()方法处理 self.slot.itemproc_size += 1 dfd = self.itemproc.process_item(output, spider) dfd.addBoth(self._itemproc_finished, output, response, spider) return dfd elif output is None: pass else: # 打印错误日志 # ...从上面的代码我们知道,对于 Spider 中间件模块最后返回的 Item 类型数据会调用 self.itemproc 对象的 process_item() 方法处理,那么这个 self.itemproc 对象是什么呢?找到 Scraper 类的 __init__() 方法:# 源码位置:scrapy/core/scraper.py# ...class Scraper: def __init__(self, crawler): # ... itemproc_cls = load_object(crawler.settings['ITEM_PROCESSOR']) self.itemproc = itemproc_cls.from_crawler(crawler) # ... # ...来看默认的配置中关于 ITEM_PROCESSOR 的值,如下:# 源码位置: scrapy/settings/default_settings.py# ...ITEM_PROCESSOR = 'scrapy.pipelines.ItemPipelineManager'# ...单看这个类的名称,又是一个某某管理器类,前面我们学了下载中间件管理类、Spider 中间件管理类,分别管理下载中间件类以及 Spider 中间件类,维护所属类方法的处理顺序。这里我们也是需要一个同样功能的管理类,来保证依次处理相应的 Item pipelines。我们进入该管理器类,阅读其实现代码:# 源码位置:scrapy/from scrapy.middleware import MiddlewareManagerfrom scrapy.utils.conf import build_component_listfrom scrapy.utils.defer import deferred_f_from_coro_fclass ItemPipelineManager(MiddlewareManager): component_name = 'item pipeline' @classmethod def _get_mwlist_from_settings(cls, settings): return build_component_list(settings.getwithbase('ITEM_PIPELINES')) def _add_middleware(self, pipe): super(ItemPipelineManager, self)._add_middleware(pipe) if hasattr(pipe, 'process_item'): self.methods['process_item'].append(deferred_f_from_coro_f(pipe.process_item)) def process_item(self, item, spider): return self._process_chain('process_item', item, spider)同样,这个管理类直接就继承了前面的中间件管理器类,其代码量非常少,十分容易理解。首先它和所有的中间件管理类一样从全局配置中获的对应管理的 pipelines,这个配置正是 ITEM_PIPELINES。其次,注意到这个 _add_middleware() 方法中有个调用父类的 _add_middleware() 方法,而父类中该方法的代码如下:# 源码位置: scrapy/middleware.py# ...class MiddlewareManager: # ... def _add_middleware(self, mw): if hasattr(mw, 'open_spider'): self.methods['open_spider'].append(mw.open_spider) if hasattr(mw, 'close_spider'): self.methods['close_spider'].appendleft(mw.close_spider)我们从而得知,在 pipeline 中会将 open_spider()、close_spider() 以及 process_item() 方法加入到对应的处理链中,且 MiddlewareManager 类中 from_crawler() 是一个类方法,因此对于继承该类的子类也同样会有该方法,也即具备了通过 Crawler 类对象实例化的能力。
- 1. 问题描述 这一小节我们将从源码的角度来分析 Pipeline 的工作过程。现在我先提出几个疑问:Scrapy 框架中使用 Pipeline 处理 Item 的代码在哪里?为什么我在 settings.py 中设置了 ITEM_PIPELINES 属性值,Scrapy 就能将其作为 Pipeline 去处理对应 Spider 生成的 Item 呢?定义 Pipeline 的那四个方法来自哪里?为什么一定需要 process_item() 方法?第12节中抓取起点月票榜小说时用到了图片管道,该管道的一个详细的处理流程是怎样的,即它如何实现图片下载?带着这些疑问,我们来进入源码中寻找答案。
pipeline相关搜索
-
pack
package
package文件
padding
pages
page对象
panda
panel
panel控件
param
parameter
parcel
parent
parentnode
parents
parse
parse error
parseint
partition
pascal