为了账号安全,请及时绑定邮箱和手机立即绑定

Python的一道题以及我的代码,求大神看看有什么可以改进的地方

Python的一道题以及我的代码,求大神看看有什么可以改进的地方

青春有我 2019-04-07 09:37:42
题目如下:spam系统,假设我们可以获得线上的实时请求(按时间顺序)每个请求包含如下信息:时间(unix时间戳)用户名动作(提问、回答、评论)内容依次考虑如何解决以下问题:1.当发现动作频率较高的用户时,输出报警(1分钟内超过10次评论、回答、提问)2.当发现一个用户连续发重复的内容时,输出报警(连续发了10个相同的评论、回答、提问)3.使用你觉得最优美的方法将上面的策略与程序分离开。使得上面的策略可以随时更新而不用修改程序。要求:服务监听一个端口,使用测试程序模拟用户行为不断向服务发送请求,请求格式为json,如:{"time":1323333"user":"xxx","action":"answer","content":"abcde"}服务输出报警格式如下xxx,"频繁提问"一下是我的代码:client.pyimportsocketimportjsonimporttimeimportgeventimportrandomdefgenerateContents():"""随机生成内容"""chars=[chr(c)forcinrange(ord('a'),ord('z')+1)]result=[''.join(random.sample(chars,5))foriinrange(3)]returnresultclassWorker():def__init__(self,name,socket):self.name=nameself.socket=socketself.actions=['question','answer','comment']self.contents=generateContents()def__call__(self):while1:data=self._generateData()data=json.dumps(data)self._request(data)def_request(self,data):s=self.socket.socket()socket=self.socket.gethostname()port=1234s.connect((socket,port))s.recv(1024)s.send(data)s.close()gevent.sleep(random.randint(1,3)*0.5)def_generateData(self):data={}data['time']=time.time()data['user']=self.namedata['action']=random.choice(self.actions)data['content']=random.choice(self.contents)returndataif__name__=='__main__':threads=[gevent.spawn(Worker(chr(i),socket))foriinrange(ord('a'),ord('f'))]gevent.joinall(threads)server.pyimportsocketimportjsonfromfunctoolsimportwrapsfromMyExamineimportMyExaminedefrun():s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)host=''port=1234s.bind((host,port))s.listen(10)while1:c,addr=s.accept()c.send('Thisisasimpleserver'.encode('utf-8'))rec=c.recv(1024)do_something(rec)c.close()defformat2Json(func):deff(*args):data=json.loads(args[0])func(data)returnf@format2Json@MyExaminedefdo_something(data):"""模拟后台处理用户发来的请求"""printdataif__name__=='__main__':run()MyExamine.pyfrombaseExamineimportBaseExaminefromcollectionsimportdequeclassMyExamine(BaseExamine):def__init__(self,func):self.rateCache={}self.timesCache={}self.func=funcdefexamineRate(self,data):userQueue=self.rateCache.setdefault(data['user'],deque())whileuserQueue:item=userQueue.pop()ifdata['time']-item['time']10:self.showWarning('User:%s频繁操作'.decode('utf8')%data['user'])defexamineContentTimes(self,data):userActions=self.timesCache.setdefault(data['user'],{})contentTimes=self.timesCache.setdefault(data['action'],{})key=data['content']contentTimes[key]=contentTimes.get(key,0)+1ifcontentTimes[key]>10:self.showWarning('User:%s%s%s超过10次'.decode('utf8')%(data['user'],data['action'],key))defshowWarning(self,msg):print'\033[;31m'+msg+'\033[0m'def__call__(self,*args):data=args[0]self.examineRate(data)self.examineContentTimes(data)self.func(*args)baseExamine.pyclassBaseExamine():"""模板方法的基类"""def__init__(self,func):passdef__call__(self,*args):pass求大神指点,谢谢!
查看完整描述

2 回答

?
qq_花开花谢_0

TA贡献1835条经验 获得超7个赞

这个没有最佳解法吧,看个人理解……下面是我的理解:1.不同的策略分为不同的类,提供一个统一的接口,比如#strategy.py
classUserRate(object):
def__init__(self,comment_per_user_min=10):
#init
defcheck(self,userdata):
#检查用户数据,超过限制即报警
classUserDupContent(object):
def__init__(self,content_send_per_user=10):
#init
defcheck(self,userdata):
#检查用户数据2.使用依赖注入将策略注入到检查程序:classGuarder(object):
defaddStrategy(self,strategy):
#添加一个策略
defcheck(self,userdata):
#使用已经添加的策略逐个检查
#返回检查结果
defreload_from(self,conf):
#解析配置并创建相应对象
self.addStrategy(strategy)
@classmethod
defcreate(cls,conf=''):
obj=cls()
ifconf:
obj.reload_from(conf)3.调用Guarder实例检查guarder=Guarder.create('antispam.ini')
defindex():
ifguarder.check(userdata):
pass
else:
#error
defadmin_reload_guarder():
'''根据web请求运行时重载配置'''
importstrategy
reload(strategy)
guarder.reload(conf)示例配置文件:#antispam.ini
[strategies]
inst=usercontent,userrate
[usercontent]
type=strategy.UserDupContent
init.comment_per_user_min=5
[userrate]
type=strategy.UserRate
init.content_send_per_user=5以上能够完成的功能如下:
1.隔离了策略代码,使用依赖注入的方式完成
2.策略本身是ducktyping,灵活扩充
3.策略代码文件strategy.py不停止服务器热部署当然,配置文件可以调整格式,直接用python代码写都可以
                            
查看完整回答
反对 回复 2019-04-07
?
炎炎设计

TA贡献1808条经验 获得超4个赞

关于“最优美的方法将上面的策略与程序分开”:引入轻量级的信号/事件机制,降低耦合度。更进一步,使用诸如rabbitmq这样的消息队列,策略和程序可以分开布署。关于这个问题实现上的优化:使用类似redis这样的服务,写入自动过期的数据,只要检查相应用户的数据数量是不是超过阈值即可。这样系统在多台设备上负载均衡也方便。
                            
查看完整回答
反对 回复 2019-04-07
  • 2 回答
  • 0 关注
  • 270 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信