2 回答
TA贡献2051条经验 获得超10个赞
Twitter v2 API 包括用于随机采样的端点和用于过滤推文的端点。
import requests
import os
import json
import pandas as pd
# To set your enviornment variables in your terminal run the following line:
# export 'BEARER_TOKEN'='<your_bearer_token>'
data = []
counter = 0
def create_headers(bearer_token):
headers = {"Authorization": "Bearer {}".format(bearer_token)}
return headers
def get_rules(headers, bearer_token):
response = requests.get(
"https://api.twitter.com/2/tweets/search/stream/rules", headers=headers
)
if response.status_code != 200:
raise Exception(
"Cannot get rules (HTTP {}): {}".format(response.status_code, response.text)
)
print(json.dumps(response.json()))
return response.json()
def delete_all_rules(headers, bearer_token, rules):
if rules is None or "data" not in rules:
return None
ids = list(map(lambda rule: rule["id"], rules["data"]))
payload = {"delete": {"ids": ids}}
response = requests.post(
"https://api.twitter.com/2/tweets/search/stream/rules",
headers=headers,
json=payload
)
if response.status_code != 200:
raise Exception(
"Cannot delete rules (HTTP {}): {}".format(
response.status_code, response.text
)
)
print(json.dumps(response.json()))
def set_rules(headers, delete, bearer_token):
# You can adjust the rules if needed
sample_rules = [
{"value": "dog has:images", "tag": "dog pictures"},
{"value": "cat has:images -grumpy", "tag": "cat pictures"},
]
payload = {"add": sample_rules}
response = requests.post(
"https://api.twitter.com/2/tweets/search/stream/rules",
headers=headers,
json=payload,
)
if response.status_code != 201:
raise Exception(
"Cannot add rules (HTTP {}): {}".format(response.status_code, response.text)
)
print(json.dumps(response.json()))
def get_stream(headers, set, bearer_token):
global data, counter
response = requests.get(
"https://api.twitter.com/2/tweets/search/stream", headers=headers, stream=True,
)
print(response.status_code)
if response.status_code != 200:
raise Exception(
"Cannot get stream (HTTP {}): {}".format(
response.status_code, response.text
)
)
for response_line in response.iter_lines():
if response_line:
json_response = json.loads(response_line)
print(json.dumps(json_response, indent=4, sort_keys=True))
data.append(json_response['data'])
if len(data) % 100 == 0:
print('storing data')
pd.read_json(json.dumps(data), orient='records').to_json(f'tw_example_{counter}.json', orient='records')
data = []
counter +=1
def main():
bearer_token = os.environ.get("BEARER_TOKEN")
headers = create_headers(bearer_token)
rules = get_rules(headers, bearer_token)
delete = delete_all_rules(headers, bearer_token, rules)
set = set_rules(headers, delete, bearer_token)
get_stream(headers, set, bearer_token)
if __name__ == "__main__":
main()
然后,将 pandas dataframe 中的数据加载为 df = pd.read_json('tw_example.json', orient='records').
TA贡献1776条经验 获得超12个赞
我建议阅读 tweepy 的 api 文档。
通过阅读其他代码片段,我相信应该这样做:
stream.filter(track=['Keyword']) print(stream.sample())
TA贡献1784条经验 获得超9个赞
据我了解,tweepy
使用 twitter v1.1 API,该 API 有单独的 API 用于实时采样和过滤推文。
Twitter API 参考。 v1 实时采样 v1 实时过滤
方法一:可以使用stream.filter(track=['Keyword1', 'keyord2'])
等方法获取过滤后的流数据,然后从收集的数据中采样记录。
class StreamListener(tweepy.StreamListener):
def on_status(self, status):
# do data processing and storing here
方法 2:可以编写以随机时间间隔启动和停止流式传输的程序(例如,每 15 分钟间隔 3 分钟随机采样)。
方法三:可以使用采样API来收集数据,然后用关键字过滤来存储相关数据。
添加回答
举报