题目如下:spam系统,假设我们可以获得线上的实时请求(按时间顺序)每个请求包含如下信息:时间(unix时间戳)用户名动作(提问、回答、评论)内容依次考虑如何解决以下问题:1.当发现动作频率较高的用户时,输出报警(1分钟内超过10次评论、回答、提问)2.当发现一个用户连续发重复的内容时,输出报警(连续发了10个相同的评论、回答、提问)3.使用你觉得最优美的方法将上面的策略与程序分离开。使得上面的策略可以随时更新而不用修改程序。要求:服务监听一个端口,使用测试程序模拟用户行为不断向服务发送请求,请求格式为json,如:{"time":1323333"user":"xxx","action":"answer","content":"abcde"}服务输出报警格式如下xxx,"频繁提问"一下是我的代码:client.pyimportsocketimportjsonimporttimeimportgeventimportrandomdefgenerateContents():"""随机生成内容"""chars=[chr(c)forcinrange(ord('a'),ord('z')+1)]result=[''.join(random.sample(chars,5))foriinrange(3)]returnresultclassWorker():def__init__(self,name,socket):self.name=nameself.socket=socketself.actions=['question','answer','comment']self.contents=generateContents()def__call__(self):while1:data=self._generateData()data=json.dumps(data)self._request(data)def_request(self,data):s=self.socket.socket()socket=self.socket.gethostname()port=1234s.connect((socket,port))s.recv(1024)s.send(data)s.close()gevent.sleep(random.randint(1,3)*0.5)def_generateData(self):data={}data['time']=time.time()data['user']=self.namedata['action']=random.choice(self.actions)data['content']=random.choice(self.contents)returndataif__name__=='__main__':threads=[gevent.spawn(Worker(chr(i),socket))foriinrange(ord('a'),ord('f'))]gevent.joinall(threads)server.pyimportsocketimportjsonfromfunctoolsimportwrapsfromMyExamineimportMyExaminedefrun():s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)host=''port=1234s.bind((host,port))s.listen(10)while1:c,addr=s.accept()c.send('Thisisasimpleserver'.encode('utf-8'))rec=c.recv(1024)do_something(rec)c.close()defformat2Json(func):deff(*args):data=json.loads(args[0])func(data)returnf@format2Json@MyExaminedefdo_something(data):"""模拟后台处理用户发来的请求"""printdataif__name__=='__main__':run()MyExamine.pyfrombaseExamineimportBaseExaminefromcollectionsimportdequeclassMyExamine(BaseExamine):def__init__(self,func):self.rateCache={}self.timesCache={}self.func=funcdefexamineRate(self,data):userQueue=self.rateCache.setdefault(data['user'],deque())whileuserQueue:item=userQueue.pop()ifdata['time']-item['time']10:self.showWarning('User:%s频繁操作'.decode('utf8')%data['user'])defexamineContentTimes(self,data):userActions=self.timesCache.setdefault(data['user'],{})contentTimes=self.timesCache.setdefault(data['action'],{})key=data['content']contentTimes[key]=contentTimes.get(key,0)+1ifcontentTimes[key]>10:self.showWarning('User:%s%s%s超过10次'.decode('utf8')%(data['user'],data['action'],key))defshowWarning(self,msg):print'\033[;31m'+msg+'\033[0m'def__call__(self,*args):data=args[0]self.examineRate(data)self.examineContentTimes(data)self.func(*args)baseExamine.pyclassBaseExamine():"""模板方法的基类"""def__init__(self,func):passdef__call__(self,*args):pass求大神指点,谢谢!
2 回答
qq_花开花谢_0
TA贡献1835条经验 获得超7个赞
这个没有最佳解法吧,看个人理解……下面是我的理解:1.不同的策略分为不同的类,提供一个统一的接口,比如#strategy.pyclassUserRate(object):def__init__(self,comment_per_user_min=10):#initdefcheck(self,userdata):#检查用户数据,超过限制即报警classUserDupContent(object):def__init__(self,content_send_per_user=10):#initdefcheck(self,userdata):#检查用户数据2.使用依赖注入将策略注入到检查程序:classGuarder(object):defaddStrategy(self,strategy):#添加一个策略defcheck(self,userdata):#使用已经添加的策略逐个检查#返回检查结果defreload_from(self,conf):#解析配置并创建相应对象self.addStrategy(strategy)@classmethoddefcreate(cls,conf=''):obj=cls()ifconf:obj.reload_from(conf)3.调用Guarder实例检查guarder=Guarder.create('antispam.ini')defindex():ifguarder.check(userdata):passelse:#errordefadmin_reload_guarder():'''根据web请求运行时重载配置'''importstrategyreload(strategy)guarder.reload(conf)示例配置文件:#antispam.ini[strategies]inst=usercontent,userrate[usercontent]type=strategy.UserDupContentinit.comment_per_user_min=5[userrate]type=strategy.UserRateinit.content_send_per_user=5以上能够完成的功能如下:1.隔离了策略代码,使用依赖注入的方式完成2.策略本身是ducktyping,灵活扩充3.策略代码文件strategy.py不停止服务器热部署当然,配置文件可以调整格式,直接用python代码写都可以
炎炎设计
TA贡献1808条经验 获得超4个赞
关于“最优美的方法将上面的策略与程序分开”:引入轻量级的信号/事件机制,降低耦合度。更进一步,使用诸如rabbitmq这样的消息队列,策略和程序可以分开布署。关于这个问题实现上的优化:使用类似redis这样的服务,写入自动过期的数据,只要检查相应用户的数据数量是不是超过阈值即可。这样系统在多台设备上负载均衡也方便。
添加回答
举报
0/150
提交
取消