对于我的学士论文,我尝试使用 http 连接向 kafka 发送机器数据(在这种情况下,使用 python 脚本发送的历史数据)。我正在使用在 Windows 系统上的 docker 中运行的融合平台。使用 python 脚本,我尝试将数据发送到 REST 代理。起初,我收到了关于我能够解决的数据类型的错误响应。import pandas as pdimport csv, os, json, requests, time, datetime, copy, sysif len(sys.argv) > 1: bgrfc_value = str(sys.argv[1])else: print("No arguments for bgrfc given, defaulting to 'false'") bgrfc_value = 'false'if len(sys.argv) > 2: filePath = str(sys.argv[2])else: filePath = "path"if len(sys.argv) > 3: batchSize = int(float(str(sys.argv[3])))else: batchSize = 10# Build skeleton JSONbasejson = {"message": {"meta" : "", "data": ""}}#metajson = [{'meta_key' : 'sender', 'meta_value': 'OPCR'},# {'meta_key' : 'receiver', 'meta_value': 'CAT'},# {'meta_key' : 'message_type', 'meta_value': 'MA1SEK'},# {'meta_key' : 'bgrfc', 'meta_value': bgrfc_value}]#basejson['message']['meta'] = metajsonurl = "http://127.0.0.1:8082/"headers = {'Content-Type':'application/json','Accept':'application/json'}def assign_timestamps(batch): newtimestamps = [] oldtimestamps = [] # Batch timestamps to list, add 10 newly generated timestamps to a list for item in batch['tag_tsp'].values.tolist(): newtimestamps.append(datetime.datetime.now()) oldtimestamps.append(datetime.datetime.strptime(str(item), "%Y%m%d%H%M%S.%f")) # Sort old timestamps without sorting the original array to preserve variance temp = copy.deepcopy(oldtimestamps) temp.sort() mrtimestamp = temp[0] # Replicate variance of old timestamps into the new timestamps for x in range(batchSize): diff = mrtimestamp - oldtimestamps[x] newtimestamps[x] = newtimestamps[x] - diff newtimestamps[x] = newtimestamps[x].strftime("%Y%m%d%H%M%S.%f")[:-3] # Switch old timestamps with new timestamps batch['tag_tsp'] = newtimestamps return batch该脚本发送数据,但作为响应,我得到状态代码 500。
2 回答
茅侃侃
TA贡献1842条经验 获得超21个赞
您的标头值不正确。你需要设置Accept和Content-type两个头下面给出:
Accept: application/vnd.kafka.v2+json
Content-Type : application/vnd.kafka.json.v2+json
此外,数据应按以下方式结构化:
{"records":[{"value":{<Put your json record here>}}]}
例如 :
{"records":[{"value":{"foo":"bar"}}]}
烙印99
TA贡献1829条经验 获得超13个赞
我相信您放入“value”的数据必须是字符串。像这样的事情会起作用:
{"records":[{"value":"{'foo':'bar'}"}]}
如果您在阅读主题时收到一条有趣的消息,请尝试使用 base64 编码对消息进行编码。编码后的原始 json 字符串应如下所示:
{"records":[{"value":"eyJmb28iOiJiYXIifQ=="}]}
添加回答
举报
0/150
提交
取消