目前,我正在使用来自组织内 RabbitMQ 队列的消息。每天,我需要将所有收到的消息推送到一个csv中,该csv最终将作为Datawarehouse中的表登陆。代码总是在监听队列,理想情况下,我希望将数据流式传输到csv。#callback funtion on receiving messagesdef onMessage(channel, method, properties, body): print(body)while True: try: #connect credentials = pika.PlainCredentials(username, password) connection = pika.BlockingConnection(pika.ConnectionParameters(host = server, port = port, virtual_host = vhost, credentials = credentials))channel = connection.channel()channel.basic_consume(on_message_callback = onMessage, queue = queueName, auto_ack = True) channel.start_consuming()开始使用队列后收到的输出如下所示:这是收到的一行数据。它基本上返回一个json对象,但是b'{“metrics”:在使用json对象时需要删除。b'{“metrics”:[{“ci_id”:“SPN-EQSHATA1”,“client_id”:“39956e6fdb256757567567567433333193a”,“name”:“deviceHealthScore”,“source_id”:“Global”,“source_management_platform”:“XXX”,“timestamp”:1582886099642,“unit”:client_id ci_id“configAssuranceScore”,“source_id”:“Global”,“source_management_platform”:“XXX”,“timestamp”:1582886099325,“unit”:“count”,“value”:“1.0”},{“ci_id”:”SPN-EQSHATA1“,”client_id“:”39956e6fdb25675756756743333193a“,”name“:”imageAssuranceScore“,”source_id“:”Global“,”source_management_platform“:”XXX“,”timestamp“:1582886099325,”unit“:”count“,”count“,”value“:”1.0“},{”ci_id“:”SPN-EQSHATA1“,”client_id“:”39956e6fdb256757567567567433333193a“,”name“:”vulnerabilityAssuranceScore“,”source_id“:”Global“,”source_management_platform“:”XXX“,”timestamp“:1582886099325,”unit“:”count“,”value“:”10.0“},{”ci_id“:”SPN-EQSHATA1“,”client_id“:”39956e6fdb256757567567433333193a“,”name“:”overallAssuranceScore“,”source_id“:”Global“,”source_management_platform“:”XXX“,”timestamp“:1582886099642,”unit“:”count“,”value“:”5.5“}],”emr_published_on“:1582886099642}'
1 回答
炎炎设计
TA贡献1808条经验 获得超4个赞
b'...'
只是意味着你得到了一个json模块可以愉快地处理的字节字符串。您将获得一个字典,对于键,它具有字典列表的值。该列表可以直接馈送数据帧。metrics
这意味着您可以像以下方式一样简单地处理它:
df = pd.DataFrame(json.loads(body)['metrics'])
添加回答
举报
0/150
提交
取消