为了账号安全,请及时绑定邮箱和手机立即绑定

如何将数据发送到 EventHub 设置 PartitionId 而不是 PartitionKey

如何将数据发送到 EventHub 设置 PartitionId 而不是 PartitionKey

料青山看我应如是 2022-01-18 17:38:50
我在 Microsoft Docs 看到有一种方法可以通过设置 PartitionId 而不是 PartitionKey(使用 C#)将数据发送到我想要的分区。CreatePartitionSender(String) 创建一个可以将 EventData 直接发布到特定 EventHub 分区的 PartitionSender。但是,我在 Python 中找不到相同的内容。有什么可用的方法吗?
查看完整描述

2 回答

?
慕后森

TA贡献1802条经验 获得超5个赞

有两种方法可以将数据发送到 Azure 事件中心,即 HTTP REST API 和 AMQP 1.0 协议。

对于使用 HTTP REST API 或Azure EventHub Python 客户端库,只有partitionId参数支持向 Event Hub 中的指定分区发送新事件,如下两个。

  1. REST APISend partition event需要partitionIdendpoint中的参数https://{servicebusNamespace}.servicebus.windows.net/{eventHubPath}/partitions/{partitionId}/messages,它是唯一一个支持发送分区特性的REST API。

  2. 的源代码注释Sender.py解释partition参数如下。

    :param partition: The specific partition ID to send to. Default is None, in which case the service
     will assign to all partitions using round-robin.:type partition: str

所以实际上,您不能使用partitionKeyvalue 将事件发送到指定的 EventHub 分区,除非在 Python 中使用 AMQP 1.0。使用AMQP 1.0,请查看官方文档AMQP 1.0 in Azure Service Bus and Event Hubs protocol guidepartition-key在页面上搜索,结果如下。

//img1.sycdn.imooc.com//61e68ac60001061d10650370.jpg

查看完整回答
反对 回复 2022-01-18
?
萧十郎

TA贡献1815条经验 获得超13个赞

我不太确定,但使用 python ,这是打开连接的方法


def open(self):

        """

        Open the Sender using the supplied conneciton.

        If the handler has previously been redirected, the redirect

        context will be used to create a new handler before opening it.

        :param connection: The underlying client shared connection.

        :type: connection: ~uamqp.connection.Connection

        """

        self.running = True

        if self.redirected:

            self.target = self.redirected.address

            self._handler = SendClient(

                self.target,

                auth=self.client.get_auth(),

                debug=self.client.debug,

                msg_timeout=self.timeout,

                error_policy=self.retry_policy,

                keep_alive_interval=self.keep_alive,

                client_name=self.name,

                properties=self.client.create_properties())

        self._handler.open()

        while not self._handler.client_ready():

            time.sleep(0.05)

这是初始化


def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=None, auto_reconnect=True):

        """

        Instantiate an EventHub event Sender handler.

        :param client: The parent EventHubClient.

        :type client: ~azure.eventhub.client.EventHubClient.

        :param target: The URI of the EventHub to send to.

        :type target: str

        :param partition: The specific partition ID to send to. Default is None, in which case the service

         will assign to all partitions using round-robin.

        :type partition: str

        :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is

         queued. Default value is 60 seconds. If set to 0, there will be no timeout.

        :type send_timeout: int

        :param keep_alive: The time interval in seconds between pinging the connection to keep it alive during

         periods of inactivity. The default value is None, i.e. no keep alive pings.

        :type keep_alive: int

        :param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs.

         Default value is `True`.

        :type auto_reconnect: bool

        """

        self.running = False

        self.client = client

        self.target = target

        self.partition = partition

        self.timeout = send_timeout

        self.redirected = None

        self.error = None

        self.keep_alive = keep_alive

        self.auto_reconnect = auto_reconnect

        self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)

        self.reconnect_backoff = 1

        self.name = "EHSender-{}".format(uuid.uuid4())

        if partition:

            self.target += "/Partitions/" + partition

            self.name += "-partition{}".format(partition)

        self._handler = SendClient(

            self.target,

            auth=self.client.get_auth(),

            debug=self.client.debug,

            msg_timeout=self.timeout,

            error_policy=self.retry_policy,

            keep_alive_interval=self.keep_alive,

            client_name=self.name,

            properties=self.client.create_properties())

        self._outcome = None

        self._condition = None

我相信,下面的函数行只会创建一个分区发送者


if partition:

                self.target += "/Partitions/" + partition

                self.name += "-partition{}".format(partition)

参考


https://github.com/Azure/azure-event-hubs-python/blob/master/azure/eventhub/sender.py


希望能帮助到你。


查看完整回答
反对 回复 2022-01-18
  • 2 回答
  • 0 关注
  • 232 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信