为了账号安全,请及时绑定邮箱和手机立即绑定

使 Boto3 上传调用阻塞(单线程)

使 Boto3 上传调用阻塞(单线程)

慕桂英546537 2022-01-18 17:47:48
编辑:我最初的假设被证明部分错误。我在这里添加了一个冗长的答案,我邀请其他人对其进行压力测试和纠正。我正在寻找一种以单线程方式利用 Boto3 S3 API 来模拟线程安全键值存储的方法。简而言之,我想使用调用线程而不是新线程来进行上传。.upload_fileobj()据我所知, Boto3(或)中方法的默认行为.upload_file()是将任务启动到新线程并None立即返回。从文档:这是一种托管传输,如有必要,它将在多个线程中执行分段上传。(如果我对此的理解首先是错误的,那么对此进行更正也会有所帮助。这是在 Boto3 1.9.134 中。)>>> import io>>> import boto3>>> bucket = boto3.resource('s3').Bucket('my-bucket-name')>>> buf = io.BytesIO(b"test")>>> res = bucket.upload_fileobj(buf, 'testobj')>>> res is NoneTrue现在,假设这buf不是一个短的 4 字节字符串,而是一个巨大的文本 blob,它将花费不可忽略的时间来完全上传。我还使用此函数来检查具有给定键的对象是否存在:def key_exists_in_bucket(bucket_obj, key: str) -> bool:    try:        bucket_obj.Object(key).load()    except botocore.exceptions.ClientError:        return False    else:        return True如果对象按名称存在,我的意图是不重写该对象。这里的竞争条件相当明显:异步启动上传,然后快速检查key_exists_in_bucket(),False如果对象仍在写入,则返回,然后不必要地再次写入它。有没有办法确保bucket.upload_fileobj()由当前线程而不是在该方法范围内创建的新线程调用?我意识到这会减慢速度。在这种情况下,我愿意牺牲速度。
查看完整描述

3 回答

?
沧海一幻觉

TA贡献1824条经验 获得超5个赞

我认为,由于这个问题的答案和另一个类似问题的答案似乎直接冲突,所以最好直接使用pdb.


概括

boto3 默认情况下使用多个线程 (10)

但是,它不是异步的,因为它在返回之前等待(加入)这些线程,而不是使用“即发即弃”技术

因此,以这种方式,如果您尝试与来自多个客户端的 s3 存储桶通信,则读/写线程安全性就位。

细节

我在这里努力解决的一个方面是多个(子线程)并不意味着顶级方法本身是非阻塞的:如果调用线程开始上传到多个子线程,然后等待这些线程完成并返回,我敢说这仍然是一个阻塞电话。反过来asyncio说,如果方法调用是一个“即发即弃”的调用。使用threading,这实际上归结为是否x.join()曾经被调用过。


这是取自 Victor Val 的初始代码,用于启动调试器:


import io

import pdb


import boto3


# From dd if=/dev/zero of=100mb.txt  bs=50M  count=1

buf = io.BytesIO(open('100mb.txt', 'rb').read())

bucket = boto3.resource('s3').Bucket('test-threads')

pdb.run("bucket.upload_fileobj(buf, '100mb')")

此堆栈帧来自 Boto 1.9.134。


现在跳入pdb:


.upload_fileobj() 首先调用一个嵌套方法——还没有太多可看的。


(Pdb) s

--Call--

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(542)bucket_upload_fileobj()

-> def bucket_upload_fileobj(self, Fileobj, Key, ExtraArgs=None,

(Pdb) s


(Pdb) l

574     

575         :type Config: boto3.s3.transfer.TransferConfig

576         :param Config: The transfer configuration to be used when performing the

577             upload.

578         """

579  ->     return self.meta.client.upload_fileobj(

580             Fileobj=Fileobj, Bucket=self.name, Key=Key, ExtraArgs=ExtraArgs,

581             Callback=Callback, Config=Config)

582     

583     

584  

所以顶级方法确实返回了一些东西,但目前还不清楚那个东西最终会变成什么None。


所以我们进入了那个。


现在,.upload_fileobj()确实有一个config参数,默认情况下是 None :


(Pdb) l 531

526     

527         subscribers = None

528         if Callback is not None:

529             subscribers = [ProgressCallbackInvoker(Callback)]

530     

531         config = Config

532         if config is None:

533             config = TransferConfig()

534     

535         with create_transfer_manager(self, config) as manager:

536             future = manager.upload(

这意味着config成为默认值TransferConfig():


use_threads-- 如果为 True,则执行 S3 传输时将使用线程。如果为 False,则不会使用线程来执行传输:所有逻辑都将在主线程中运行。

max_concurrency-- 请求执行传输的最大线程数。如果 use_threads 设置为 False,则忽略提供的值,因为传输只会使用主线程。

哇啦,他们在这里:


(Pdb) unt 534

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(535)upload_fileobj()

-> with create_transfer_manager(self, config) as manager:

(Pdb) config

<boto3.s3.transfer.TransferConfig object at 0x7f1790dc0cc0>

(Pdb) config.use_threads

True

(Pdb) config.max_concurrency

10

现在我们在调用堆栈中下降一个级别以使用TransferManager(上下文管理器)。此时,max_concurrency已被用作类似名称的参数max_request_concurrency:


# https://github.com/boto/s3transfer/blob/2aead638c8385d8ae0b1756b2de17e8fad45fffa/s3transfer/manager.py#L223


    # The executor responsible for making S3 API transfer requests

    self._request_executor = BoundedExecutor(

        max_size=self._config.max_request_queue_size,

        max_num_threads=self._config.max_request_concurrency,

        tag_semaphores={

            IN_MEMORY_UPLOAD_TAG: TaskSemaphore(

                self._config.max_in_memory_upload_chunks),

            IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(

                self._config.max_in_memory_download_chunks)

        },

        executor_cls=executor_cls

    )

至少在这个 boto3 版本中,该类来自单独的库s3transfer。


(Pdb) n

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(536)upload_fileobj()

-> future = manager.upload(

(Pdb) manager

<s3transfer.manager.TransferManager object at 0x7f178db437f0>

(Pdb) manager._config

<boto3.s3.transfer.TransferConfig object at 0x7f1790dc0cc0>

(Pdb) manager._config.use_threads

True

(Pdb) manager._config.max_concurrency

10

接下来,让我们进入manager.upload(). 这是该方法的全文:


(Pdb) l 290, 303

290  ->         if extra_args is None:

291                 extra_args = {}

292             if subscribers is None:

293                 subscribers = []

294             self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)

295             call_args = CallArgs(

296                 fileobj=fileobj, bucket=bucket, key=key, extra_args=extra_args,

297                 subscribers=subscribers

298             )

299             extra_main_kwargs = {}

300             if self._bandwidth_limiter:

301                 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter

302             return self._submit_transfer(

303                 call_args, UploadSubmissionTask, extra_main_kwargs)


(Pdb) unt 301

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(302)upload()

-> return self._submit_transfer(

(Pdb) extra_main_kwargs

{}


(Pdb) UploadSubmissionTask

<class 's3transfer.upload.UploadSubmissionTask'>

(Pdb) call_args

<s3transfer.utils.CallArgs object at 0x7f178db5a5f8>


(Pdb) l 300, 5

300             if self._bandwidth_limiter:

301                 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter

302  ->         return self._submit_transfer(

303                 call_args, UploadSubmissionTask, extra_main_kwargs)

304     

305         def download(self, bucket, key, fileobj, extra_args=None,

啊,太可爱了——所以我们至少需要再往下一层才能看到实际的底层上传。


(Pdb) s

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(303)upload()

-> call_args, UploadSubmissionTask, extra_main_kwargs)

(Pdb) s

--Call--

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(438)_submit_transfer()

-> def _submit_transfer(self, call_args, submission_task_cls,

(Pdb) s

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(440)_submit_transfer()

-> if not extra_main_kwargs:


(Pdb) l 440, 10

440  ->         if not extra_main_kwargs:

441                 extra_main_kwargs = {}

442     

443             # Create a TransferFuture to return back to the user

444             transfer_future, components = self._get_future_with_components(

445                 call_args)

446     

447             # Add any provided done callbacks to the created transfer future

448             # to be invoked on the transfer future being complete.

449             for callback in get_callbacks(transfer_future, 'done'):

450                 components['coordinator'].add_done_callback(callback)

好的,所以现在我们有一个TransferFuture, 定义在没有明确的证据表明线程已经被启动了,但是当涉及到期货s3transfer/futures.py 时,它肯定听起来像这样。


(Pdb) l

444             transfer_future, components = self._get_future_with_components(

445                 call_args)

446     

447             # Add any provided done callbacks to the created transfer future

448             # to be invoked on the transfer future being complete.

449  ->         for callback in get_callbacks(transfer_future, 'done'):

450                 components['coordinator'].add_done_callback(callback)

451     

452             # Get the main kwargs needed to instantiate the submission task

453             main_kwargs = self._get_submission_task_main_kwargs(

454                 transfer_future, extra_main_kwargs)

(Pdb) transfer_future

<s3transfer.futures.TransferFuture object at 0x7f178db5a780>

下面的最后一行来自TransferCoordinator课堂,乍一看似乎很重要:


class TransferCoordinator(object):

    """A helper class for managing TransferFuture"""

    def __init__(self, transfer_id=None):

        self.transfer_id = transfer_id

        self._status = 'not-started'

        self._result = None

        self._exception = None

        self._associated_futures = set()

        self._failure_cleanups = []

        self._done_callbacks = []

        self._done_event = threading.Event()  # < ------ !!!!!!

您通常会看到threading.Event 一个线程用于发出事件状态的信号,而其他线程可以等待该事件发生。


TransferCoordinator是由 .使用的TransferFuture.result()。


好的,从上面循环回来,我们现在在s3transfer.futures.BoundedExecutor它的max_num_threads属性:


class BoundedExecutor(object):

    EXECUTOR_CLS = futures.ThreadPoolExecutor

    # ...

    def __init__(self, max_size, max_num_threads, tag_semaphores=None,

                 executor_cls=None):

    self._max_num_threads = max_num_threads

    if executor_cls is None:

        executor_cls = self.EXECUTOR_CLS

    self._executor = executor_cls(max_workers=self._max_num_threads)

这基本上相当于:


from concurrent import futures


_executor = futures.ThreadPoolExecutor(max_workers=10)

但是仍然存在一个问题:这是一种“即发即弃”,还是调用实际上是在等待线程完成并返回?


似乎是后者。 .result()来电self._done_event.wait(MAXINT)。


# https://github.com/boto/s3transfer/blob/2aead638c8385d8ae0b1756b2de17e8fad45fffa/s3transfer/futures.py#L249


def result(self):

    self._done_event.wait(MAXINT)


    # Once done waiting, raise an exception if present or return the

    # final result.

    if self._exception:

        raise self._exception

    return self._result

最后,重新运行 Victor Val 的测试,这似乎证实了上述内容:


>>> import boto3

>>> import time

>>> import io

>>> 

>>> buf = io.BytesIO(open('100mb.txt', 'rb').read())

>>> 

>>> bucket = boto3.resource('s3').Bucket('test-threads')

>>> start = time.time()

>>> print("starting to upload...")

starting to upload...

>>> bucket.upload_fileobj(buf, '100mb')

>>> print("finished uploading")

finished uploading

>>> end = time.time()

>>> print("time: {}".format(end-start))

time: 2.6030001640319824

(此示例在网络优化实例上运行时,此执行时间可能更短。但 2.5 秒仍然是一个明显的大块时间,并且根本不表示线程被启动并且没有等待。)


最后,这是一个Callbackfor的示例.upload_fileobj()。它遵循文档中的示例。


首先,一个小帮手可以有效地获取缓冲区的大小:


def get_bufsize(buf, chunk=1024) -> int:

    start = buf.tell()

    try:

        size = 0 

        while True: 

            out = buf.read(chunk) 

            if out: 

                size += chunk 

            else: 

                break

        return size

    finally:

        buf.seek(start)

类本身:


import os

import sys

import threading

import time


class ProgressPercentage(object):

    def __init__(self, filename, buf):

        self._filename = filename

        self._size = float(get_bufsize(buf))

        self._seen_so_far = 0

        self._lock = threading.Lock()

        self.start = None


    def __call__(self, bytes_amount):

        with self._lock:

            if not self.start:

                self.start = time.monotonic()

            self._seen_so_far += bytes_amount

            percentage = (self._seen_so_far / self._size) * 100

            sys.stdout.write(

                "\r%s  %s of %s  (%.2f%% done, %.2fs elapsed\n" % (

                    self._filename, self._seen_so_far, self._size,

                    percentage, time.monotonic() - self.start))

            # Use sys.stdout.flush() to update on one line

            # sys.stdout.flush()

例子:


In [19]: import io 

    ...:  

    ...: from boto3.session import Session 

    ...:  

    ...: s3 = Session().resource("s3") 

    ...: bucket = s3.Bucket("test-threads") 

    ...: buf = io.BytesIO(open('100mb.txt', 'rb').read()) 

    ...:  

    ...: bucket.upload_fileobj(buf, 'mykey', Callback=ProgressPercentage("mykey", buf))                                                                                                                                                                      

mykey  262144 of 104857600.0  (0.25% done, 0.00s elapsed

mykey  524288 of 104857600.0  (0.50% done, 0.00s elapsed

mykey  786432 of 104857600.0  (0.75% done, 0.01s elapsed

mykey  1048576 of 104857600.0  (1.00% done, 0.01s elapsed

mykey  1310720 of 104857600.0  (1.25% done, 0.01s elapsed

mykey  1572864 of 104857600.0  (1.50% done, 0.02s elapsed


查看完整回答
反对 回复 2022-01-18
?
吃鸡游戏

TA贡献1829条经验 获得超7个赞

upload_fileobj接受一个 Config 参数。这是一个boto3.s3.transfer.TransferConfig对象,它又具有一个名为use_threads(默认为 true)的参数 - 如果为 True,则执行 S3 传输时将使用线程。如果为 False,则不会使用线程来执行传输:所有逻辑都将在主线程中运行。

希望这对你有用。


查看完整回答
反对 回复 2022-01-18
?
慕尼黑8549860

TA贡献1818条经验 获得超11个赞

测试该方法是否阻塞:

我自己根据经验测试了这种行为。首先,我生成了一个 100MB 的文件:


dd if=/dev/zero of=100mb.txt  bs=100M  count=1

然后我尝试以与您相同的方式上传文件并测量所花费的时间:


import boto3

import time

import io

file = open('100mb.txt', 'rb')

buf = io.BytesIO(file.read())

bucket = boto3.resource('s3').Bucket('testbucket')

start = time.time()

print("starting to upload...")

bucket.upload_fileobj(buf, '100mb')

print("finished uploading")

end = time.time()

print("time: {}".format(end-start))

upload_fileobj() 方法完成并读取下一个 python 行(1gb 文件需要 50 秒)需要 8 秒以上,所以我假设这个方法是阻塞的。


使用线程测试:


使用多个线程时,即使使用选项 use_threads=False ,我也可以验证该方法是否同时支持多个传输。我开始上传一个 200mb 的文件,然后是一个 100mb 的文件,然后 100mb 的文件首先完成。这证实了TransferConfig中的并发与多部分传输有关。


代码:


import boto3

import time

import io

from boto3.s3.transfer import TransferConfig

import threading


config = TransferConfig(use_threads=False)


bucket = boto3.resource('s3').Bucket('testbucket')

def upload(filename):

     file = open(filename, 'rb')

     buf = io.BytesIO(file.read())

     start = time.time()

     print("starting to upload file {}".format(filename))

     bucket.upload_fileobj(buf,filename,Config=config)

     end = time.time()

     print("finished uploading file {}. time: {}".format(filename,end-start))

x1 = threading.Thread(target=upload, args=('200mb.txt',))

x2 = threading.Thread(target=upload, args=('100mb.txt',))

x1.start()

time.sleep(2)

x2.start()

输出:


开始上传文件 200mb.txt

开始上传文件 100mb.txt

完成上传文件 100mb.txt。时间:46.35254502296448

完成上传文件200mb.txt。时间:61.70564889907837


使用会话进行测试:

如果您希望上传方法按照调用的顺序完成,这就是您所需要的。


代码:


import boto3

import time

import io

from boto3.s3.transfer import TransferConfig

import threading


config = TransferConfig(use_threads=False)


session = boto3.session.Session()

s3 = session.resource('s3')

bucket = s3.Bucket('testbucket')

def upload(filename):

     file = open(filename, 'rb')

     buf = io.BytesIO(file.read())

     start = time.time()

     print("starting to upload file {}".format(filename))

     bucket.upload_fileobj(buf,filename)

     end = time.time()

     print("finished uploading file {}. time: {}".format(filename,end-start))

x1 = threading.Thread(target=upload, args=('200mb.txt',))

x2 = threading.Thread(target=upload, args=('100mb.txt',))

x1.start()

time.sleep(2)

x2.start()

输出:


开始上传文件 200mb.txt

开始上传文件 100mb.txt

完成上传文件 200mb.txt。时间:46.62478971481323

完成上传文件100mb.txt。时间:50.515950202941895


我发现的一些资源:

-这是在 SO 中提出的关于阻塞或非阻塞方法的问题。这不是决定性的,但那里可能有相关信息。

- GitHub 上存在一个开放问题,允许在 boto3 中进行异步传输。

- 还有像aioboto和aiobotocore这样的工具,专门用于允许从/到 s3 和其他 aws 服务的异步下载和上传。


关于我之前的回答:

您可以在此处阅读有关 boto3 中的文件传输配置的信息。特别是:


传输操作使用线程来实现并发。可以通过将 use_threads 属性设置为 False 来禁用线程使用。


最初我认为这与同时执行的多个传输有关。但是,阅读源代码时,使用TransferConfig时参数max_concurrency中的注释解释说并发不是指多次传输,而是指 “将发出请求以执行传输的线程数”。所以这是用来加速传输的东西。use_threads属性仅用于允许多部分传输中的并发性。


查看完整回答
反对 回复 2022-01-18
  • 3 回答
  • 0 关注
  • 298 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信