-
PIL的ImageDraw提供了一系列绘图方法,让我们可以直接绘图。比如要生成字母验证码图片: import Image, ImageDraw, ImageFont, ImageFilter import random # 随机字母: def rndChar(): return chr(random.randint(65, 90)) # 随机颜色1: def rndColor(): return (random.randint(64, 255), random.randint(64, 255), random.randint(64, 255)) # 随机颜色2: def rndColor2(): return (random.randint(32, 127), random.randint(32, 127), random.randint(32, 127)) # 240 x 60: width = 60 * 4 height = 60 image = Image.new('RGB', (width, height), (255, 255, 255)) # 创建Font对象: font = ImageFont.truetype('Arial.ttf', 36) # 创建Draw对象: draw = ImageDraw.Draw(image) # 填充每个像素: for x in range(width): for y in range(height): draw.point((x, y), fill=rndColor()) # 输出文字: for t in range(4): draw.text((60 * t + 10, 10), rndChar(), font=font, fill=rndColor2()) # 模糊: image = image.filter(ImageFilter.BLUR) image.save('code.jpg', 'jpeg');查看全部
-
PIL import Image # 打开一个jpg图像文件,注意路径要改成你自己的: im = Image.open('/Users/michael/test.jpg') # 获得图像尺寸: w, h = im.size # 缩放到50%: im.thumbnail((w//2, h//2)) # 把缩放后的图像用jpeg格式保存: im.save('/Users/michael/thumbnail.jpg', 'jpeg') 比如,模糊效果也只需几行代码: import Image, ImageFilter im = Image.open('/Users/michael/test.jpg') im2 = im.filter(ImageFilter.BLUR) im2.save('/Users/michael/blur.jpg', 'jpeg')查看全部
-
html解析 from HTMLParser import HTMLParser from htmlentitydefs import name2codepoint class MyHTMLParser(HTMLParser): def handle_starttag(self, tag, attrs): print('<%s>' % tag) def handle_endtag(self, tag): print('</%s>' % tag) def handle_startendtag(self, tag, attrs): print('<%s/>' % tag) def handle_data(self, data): print('data') def handle_comment(self, data): print('<!-- -->') def handle_entityref(self, name): print('&%s;' % name) def handle_charref(self, name): print('&#%s;' % name) parser = MyHTMLParser() parser.feed('<html><head></head><body><p>Some <a href=\"#\">html</a> tutorial...<br>END</p></body></html>')查看全部
-
from xml.parsers.expat import ParserCreate class DefaultSaxHandler(object): def start_element(self, name, attrs): print('sax:start_element: %s, attrs: %s' % (name, str(attrs))) def end_element(self, name): print('sax:end_element: %s' % name) def char_data(self, text): print('sax:char_data: %s' % text) xml = r'''<?xml version="1.0"?> <ol> <li><a href="/python">Python</a></li> <li><a href="/ruby">Ruby</a></li> </ol> ''' handler = DefaultSaxHandler() parser = ParserCreate() parser.returns_unicode = True parser.StartElementHandler = handler.start_element parser.EndElementHandler = handler.end_element parser.CharacterDataHandler = handler.char_data parser.Parse(xml)查看全部
-
操作XML有两种方法:DOM和SAX。DOM会把整个XML读入内存,解析为树,因此占用内存大,解析慢,优点是可以任意遍历树的节点。SAX是流模式,边读边解析,占用内存小,解析快,缺点是我们需要自己处理事件。 正常情况下,优先考虑SAX,因为DOM实在太占内存。 在Python中使用SAX解析XML非常简洁,通常我们关心的事件是start_element,end_element和char_data,准备好这3个函数,然后就可以解析xml了。 举个例子,当SAX解析器读到一个节点时: <a href="/">python</a> 会产生3个事件: start_element事件,在读取<a href="/">时; char_data事件,在读取python时; end_element事件,在读取</a>时。查看全部
-
repeat()负责把一个元素无限重复下去,不过如果提供第二个参数就可以限定重复次数: >>> ns = itertools.repeat('A', 10) >>> for n in ns: ... print n 无限序列虽然可以无限迭代下去,但是通常我们会通过takewhile()等函数根据条件判断来截取出一个有限的序列: >>> natuals = itertools.count(1) >>> ns = itertools.takewhile(lambda x: x <= 10, natuals) >>> for n in ns: ... print n itertools提供的几个迭代器操作函数更加有用: chain()可以把一组迭代对象串联起来,形成一个更大的迭代器: for c in chain('ABC', 'XYZ'): print c # 迭代效果:'A' 'B' 'C' 'X' 'Y' 'Z' groupby()把迭代器中相邻的重复元素挑出来放在一起: >>> for key, group in itertools.groupby('AAABBBCCAAA'): ... print key, list(group) # 为什么这里要用list()函数呢? ... A ['A', 'A', 'A'] B ['B', 'B', 'B'] C ['C', 'C'] A ['A', 'A', 'A'] >>> for key, group in itertools.groupby('AaaBBbcCAAa', lambda c: c.upper()): ... print key, list(group) ... A ['A', 'a', 'a'] B ['B', 'B', 'b'] C ['c', 'C'] A ['A', 'A', 'a'] imap()和map()的区别在于,imap()可以作用于无穷序列,并且,如果两个序列的长度不一致,以短的那个为准。 >>> for x in itertools.imap(lambda x, y: x * y, [10, 20, 30], itertools.count(1)): ... print x ... 10 40 90查看全部
-
由于常用口令的MD5值很容易被计算出来,所以,要确保存储的用户口令不是那些已经被计算出来的常用口令的MD5,这一方法通过对原始口令加一个复杂字符串来实现,俗称“加盐”: 经过Salt处理的MD5口令,只要Salt不被黑客知道,即使用户输入简单口令,也很难通过MD5反推明文口令。 但是如果有两个用户都使用了相同的简单口令比如123456,在数据库中,将存储两条相同的MD5值,这说明这两个用户的口令是一样的。有没有办法让使用相同口令的用户存储不同的MD5呢? 如果假定用户无法修改登录名,就可以通过把登录名作为Salt的一部分来计算MD5,从而实现相同口令的用户也存储不同的MD5。 itertools提供的几个“无限”迭代器: >>> import itertools >>> natuals = itertools.count(1) >>> for n in natuals: ... print n ... 1 2 3 因为count()会创建一个无限的迭代器,所以上述代码会打印出自然数序列,根本停不下来,只能按Ctrl+C退出。 cycle()会把传入的一个序列无限重复下去: >>> import itertools >>> cs = itertools.cycle('ABC') # 注意字符串也是序列的一种 >>> for c in cs: ... print c ... 'A' 'B' 'C' 'A' 'B' 'C' ...查看全部
-
Python提供了一个struct模块来解决str和其他二进制数据类型的转换。 struct的pack函数把任意数据类型变成字符串: >>> import struct >>> struct.pack('>I', 10240099) '\x00\x9c@c' >表示字节顺序是big-endian,也就是网络序,I表示4字节无符号整数。 >>> struct.unpack('>IH', '\xf0\xf0\xf0\xf0\x80\x80') (4042322160, 32896) 根据>IH的说明,后面的str依次变为I:4字节无符号整数和H:2字节无符号整数。 我们以常见的摘要算法MD5为例,计算出一个字符串的MD5值: import hashlib md5 = hashlib.md5() md5.update('how to use md5 in python hashlib?') print md5.hexdigest() MD5是最常见的摘要算法,速度很快,生成结果是固定的128 bit字节,通常用一个32位的16进制字符串表示。 另一种常见的摘要算法是SHA1,调用SHA1和调用MD5完全类似: import hashlib sha1 = hashlib.sha1() sha1.update('how to use sha1 in ') sha1.update('python hashlib?') print sha1.hexdigest() SHA1的结果是160 bit字节,通常用一个40位的16进制字符串表示。查看全部
-
用记事本打开exe、jpg、pdf这些文件时,我们都会看到一大堆乱码,因为二进制文件包含很多无法显示和打印的字符,所以,如果要让记事本这样的文本处理软件能处理二进制数据,就需要一个二进制到字符串的转换方法。Base64是一种最常见的二进制编码方法。 Base64的原理很简单,首先,准备一个包含64个字符的数组: 然后,对二进制数据进行处理,每3个字节一组,一共是3x8=24bit,划为4组,每组正好6个bit: 这样我们得到4个数字作为索引,然后查表,获得相应的4个字符,就是编码后的字符串。 >>> import base64 >>> base64.b64encode('binary\x00string') 'YmluYXJ5AHN0cmluZw==' >>> base64.b64decode('YmluYXJ5AHN0cmluZw==') 'binary\x00string' 由于标准的Base64编码后可能出现字符+和/,在URL中就不能直接作为参数,所以又有一种"url safe"的base64编码,其实就是把字符+和/分别变成-和_: >>> base64.b64encode('i\xb7\x1d\xfb\xef\xff') 'abcd++//' >>> base64.urlsafe_b64encode('i\xb7\x1d\xfb\xef\xff') 'abcd--__' >>> base64.urlsafe_b64decode('abcd--__') 'i\xb7\x1d\xfb\xef\xff'查看全部
-
collections是Python内建的一个集合模块,提供了许多有用的集合类。 >>> from collections import namedtuple >>> Point = namedtuple('Point', ['x', 'y']) >>> p = Point(1, 2) >>> p.x 1 >>> p.y 2 deque是为了高效实现插入和删除操作的双向列表,适合用于队列和栈: >>> from collections import deque >>> q = deque(['a', 'b', 'c']) >>> q.append('x') >>> q.appendleft('y') >>> q deque(['y', 'a', 'b', 'c', 'x']) deque除了实现list的append()和pop()外,还支持appendleft()和popleft(),这样就可以非常高效地往头部添加或删除元素。查看全部
-
如果一个正则表达式要重复使用几千次,出于效率的考虑,我们可以预编译该正则表达式,接下来重复使用时就不需要编译这个步骤了,直接匹配: >>> import re # 编译: >>> re_telephone = re.compile(r'^(\d{3})-(\d{3,8})$') # 使用: >>> re_telephone.match('010-12345').groups() ('010', '12345') >>> re_telephone.match('010-8086').groups() ('010', '8086')查看全部
-
authkey有什么用?这是为了保证两台机器正常通信,不被其他机器恶意干扰。如果taskworker.py的authkey和taskmanager.py的authkey不一致,肯定连接不上。 python 正则表达式: 切分字符串 >>> re.split(r'[\s\,\;]+', 'a,b;; c d') ['a', 'b', 'c', 'd'] 除了简单地判断是否匹配之外,正则表达式还有提取子串的强大功能。用()表示的就是要提取的分组(Group)。比如: >>> m = re.match(r'^(\d{3})-(\d{3,8})$', '010-12345') >>> m.group(0) '010-12345' >>> m.group(1) '010' >>> m.group(2) '12345' 注意到group(0)永远是原始字符串,group(1)、group(2)……表示第1、2、……个子串。 import re #t = '19:05:30' t = '01:22:30' #m = re.match(r'^(0[0-9]|1[0-9]|2[0-3]|[0-9])\:(0[0-9]|1[0-9]|2[0-9]|3[0-9]|4[0-9]|5[0-9]|[0-9])\:(0[0-9]|1[0-9]|2[0-9]|3[0-9]|4[0-9]|5[0-9]|[0-9])$', t) m = re.match(r'^([0-2]*[0-9]+)\:([0-5]*[0-9]+)\:([0-5]*[0-9]+)$',t) print m.groups() 最后需要特别指出的是,正则匹配默认是贪婪匹配,也就是匹配尽可能多的字符。 >>> re.match(r'^(\d+)(0*)$', '102300').groups() ('102300', '') 由于\d+采用贪婪匹配,直接把后面的0全部匹配了,结果0*只能匹配空字符串了。 必须让\d+采用非贪婪匹配(也就是尽可能少匹配),才能把后面的0匹配出来,加个?就可以让\d+采用非贪婪匹配: >>> re.match(r'^(\d+?)(0*)$', '102300').groups() ('1023', '00')查看全部
-
import time, sys, Queue from multiprocessing.managers import BaseManager # 创建类似的QueueManager: class QueueManager(BaseManager): pass # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 连接到服务器,也就是运行taskmanager.py的机器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和验证码注意保持与taskmanager.py设置的完全一致: m = QueueManager(address=(server_addr, 5000), authkey='abc') # 从网络连接: m.connect() # 获取Queue的对象: task = m.get_task_queue() result = m.get_result_queue() # 从task队列取任务,并把结果写入result队列: for i in range(10): try: n = task.get(timeout=1) print('run task %d * %d...' % (n, n)) r = '%d * %d = %d' % (n, n, n*n) time.sleep(1) result.put(r) except Queue.Empty: print('task queue is empty.') # 处理结束: print('worker exit.')查看全部
-
import random, time, Queue from multiprocessing.managers import BaseManager # 发送任务的队列: task_queue = Queue.Queue() # 接收结果的队列: result_queue = Queue.Queue() # 从BaseManager继承的QueueManager: class QueueManager(BaseManager): pass # 把两个Queue都注册到网络上, callable参数关联了Queue对象: QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 绑定端口5000, 设置验证码'abc': manager = QueueManager(address=('', 5000), authkey='abc') # 启动Queue: manager.start() # 获得通过网络访问的Queue对象: task = manager.get_task_queue() result = manager.get_result_queue() # 放几个任务进去: for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 从result队列读取结果: print('Try get results...') for i in range(10): r = result.get(timeout=10) print('Result: %s' % r) # 关闭: manager.shutdown()查看全部
-
计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。 IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。 现代操作系统对IO操作已经做了巨大的改进,最大的特点就是支持异步IO。如果充分利用操作系统提供的异步IO支持,就可以用单进程单线程模型来执行多任务,这种全新的模型称为事件驱动模型,Nginx就是支持异步IO的Web服务器,它在单核CPU上采用单进程模型就可以高效地支持多任务。在多核CPU上,可以运行多个进程(数量与CPU核心数相同),充分利用多核CPU。由于系统总的进程数量十分有限,因此操作系统调度非常高效。用异步IO编程模型来实现多任务是一个主要的趋势。 Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。 举个例子:如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现? 原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。 我们先看服务进程,服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务:查看全部
举报
0/150
提交
取消