• Skip to main content
  • Skip to secondary menu
  • Skip to primary sidebar
  • Home
  • Contact Us

iHash

News and How to's

  • The 2023 Travel Hacker Bundle ft. Rosetta Stone Lifetime Subscription for $199

    The 2023 Travel Hacker Bundle ft. Rosetta Stone Lifetime Subscription for $199
  • Apple iPad Air 2, 16GB – Silver (Refurbished: Wi-Fi Only) for $106

    Apple iPad Air 2, 16GB – Silver (Refurbished: Wi-Fi Only) for $106
  • S300 eufyCam (eufyCam 3C) 3-Cam Kit for $579

    S300 eufyCam (eufyCam 3C) 3-Cam Kit for $579
  • eufy Baby Monitor 2 (2K, Smart, Wi-Fi) for $119

    eufy Baby Monitor 2 (2K, Smart, Wi-Fi) for $119
  • eufy SpaceView Add-On Video Baby Monitor for $99

    eufy SpaceView Add-On Video Baby Monitor for $99
  • News
    • Rumor
    • Design
    • Concept
    • WWDC
    • Security
    • BigData
  • Apps
    • Free Apps
    • OS X
    • iOS
    • iTunes
      • Music
      • Movie
      • Books
  • How to
    • OS X
      • OS X Mavericks
      • OS X Yosemite
      • Where Download OS X 10.9 Mavericks
    • iOS
      • iOS 7
      • iOS 8
      • iPhone Firmware
      • iPad Firmware
      • iPod touch
      • AppleTV Firmware
      • Where Download iOS 7 Beta
      • Jailbreak News
      • iOS 8 Beta/GM Download Links (mega links) and How to Upgrade
      • iPhone Recovery Mode
      • iPhone DFU Mode
      • How to Upgrade iOS 6 to iOS 7
      • How To Downgrade From iOS 7 Beta to iOS 6
    • Other
      • Disable Apple Remote Control
      • Pair Apple Remote Control
      • Unpair Apple Remote Control
  • Special Offers
  • Contact us

3 essential async patterns for building a Python service

Jan 22, 2023 by iHash Leave a Comment


Table of Contents

  • 3 essential async patterns for building a Python service
  • Make a graceful and fast exit
  • Pool your tasks
  • Harness your memory usage
  • What do you think?

3 essential async patterns for building a Python service

Learn how to avoid the most common pitfalls when creating a service

blog-thumb-elevate-our-work-1680x980.png

A service is an application that runs forever, reacting to events differently. For example, a website or a web service gets HTTP requests and sends back responses in various formats (HTML, JSON, etc.). For the connectors-python project, we’re pulling work from Elasticsearch, grabbing data from some source to inject into Elasticsearch, and going back to an idling state.

The structure of such a service using modern Python is based on starting a global task via an async function that loops forever and idles when it’s not triggering some work. It’s a pretty straightforward pattern and offers a simple model: whenever something needs to happen in that main function, it’s deferred to another asynchronous task, and life goes on.

There are a few considerations to make such an application work well. We want to be sure to do the following:

  • Exit in a clean and fast way
  • Avoid tasks explosion
  • Harness your memory usage

Projects like Trio, AnyIO, and to some extent, some features in Curio, are addressing these topics in various ways, but this blog post focuses on implementing solutions using vanilla Python.

[Related article: Perf8: Performance metrics for Python]

Make a graceful and fast exit

Let’s consider the following example, where the main task checks for some work to do and then sleeps for a minute by calling asyncio.sleep(). It implements a clean signal handler to do some cleanups and toggle the running flag for a graceful termination:

import asyncio
import os
import signal


async def main():
    loop = asyncio.get_running_loop()
    running = True

    def shutdown():
        nonlocal running
        # cleanup work
        running = False  # will end the loop

    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, shutdown)

    while running:         
        await check_and_execute_work()
        await asyncio.sleep(60)

asyncio.run(main())Read more

But this implementation will make your service unresponsive for up to a minute when handling the shutdown signals if it is waiting for sleep to be over. This can be a minor annoyance, but in some execution environments, it can be a bigger problem. You could also have concurrent sleep calls in other tasks in your application, which will slow down a clean termination because you will have to wait for all tasks to finish. 

To avoid the problem, we need to be able to immediately cancel all active sleeps. A simple pattern is to build an asyncio.sleep factory that keeps track of all sleep tasks and can cancel them if needed. 

Below is a class we use in our project:

class CancellableSleeps:
    def __init__(self):
        self._sleeps = set()

    async def sleep(self, delay, result=None, *, loop=None):
        async def _sleep(delay, result=None, *, loop=None):
            coro = asyncio.sleep(delay, result=result)
            task = asyncio.ensure_future(coro)
            self._sleeps.add(task)
            try:
                return await task
            except asyncio.CancelledError:
                print("Sleep canceled")
                return result
            finally:
                self._sleeps.remove(task)

        await _sleep(delay, result=result, loop=loop)

    def cancel(self):
        for task in self._sleeps:
            task.cancel()Read more

This class keeps track of all running asyncio.sleep() tasks and provides a way to cancel them immediately. To use it in the service, you must create one class instance and have all the code use its sleep() method instead of asyncio.sleep().

Below is the modified version of the main() function that uses the CancellableSleeps class:

async def main():
    sleeps = CancellableSleeps()
    loop = asyncio.get_running_loop()
    running = True

    def shutdown():
        nonlocal running
        # cleanup work
        running = False  # will end the loop
        sleeps.cancel() # cancel all running sleep tasks

    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, shutdown)

    while running:
        await check_and_execute_work()
        await sleeps.sleep(60)Read more

The shutdown() function will toggle the running flag and cancel all sleeps. That same sleeps object could be passed around and used by all tasks that need to run a sleep task.

Notice that this shared cancellation pattern is implemented in Trio if you use that library (see Trio’s core functionality).

Pool your tasks

A caller has two possible strategies to execute a task by calling an async function. It can wait for its termination with the await keyword, which makes the caller itself block for the result. The second option is to use a fire-and-forget strategy, where the caller adds a task in the event loop and continues with its life without waiting for the task to finish.

The second pattern is handy when you need concurrency from within an async function — for instance, when your code needs to send updates to Elasticsearch and you know you can do it through many concurrent requests.

A possible function that is doing this could be:

async def send_data():
    tasks = [] 
    while have_data: 
        batch = await get_batch_of data()
        t = asyncio.create_task(send_data_to_es(batch))
        tasks.append(t)
        t.add_done_callaback(tasks.remove)

    # make sure we wait for all tasks to end
    await asyncio.gather(*tasks)

This loop allows several send_data_to_es() tasks to run concurrently, but if the get_batch_data() is way faster than the execution time of send_data_to_es(), you’ve just built a tasks bomb! Calls to Elasticsearch are going to pile up and eventually time out. This may also lead to memory issues if the batches use a lot of memory.

To avoid this problem, send_data_to_es() tasks need to be throttled, and a generic way to do it is to build a pool of tasks with an upper limit on concurrency.

Below is a class that we use for this throttling:

class ConcurrentTasks:
    def __init__(self, max_concurrency=5, 
                 results_callback=None):
        self.max_concurrency = max_concurrency
    	self.tasks = []
        self.results_callback = results_callback
    	self._task_over = asyncio.Event()

    def __len__(self):
        return len(self.tasks)

    def _callback(self, task, result_callback=None):
        self.tasks.remove(task)
    	self._task_over.set()
    	if task.exception():
            raise task.exception()
    	if result_callback is not None:
            result_callback(task.result())
    	# global callback
    	if self.results_callback is not None:
            self.results_callback(task.result())

    async def put(self, coroutine, result_callback=None):
    	# If self.tasks has reached its max size
        # we wait for one task to finish
    	if len(self.tasks) >= self.max_concurrency:
            await self._task_over.wait()
            # rearm
            self._task_over.clear()
    	task = asyncio.create_task(coroutine())
    	self.tasks.append(task)
    	task.add_done_callback(
         functools.partial(self._callback,
                           result_callback=result_callback)
    	)
    	return task

    async def join(self):
    	await asyncio.gather(*self.tasks)Read more

It’s a bit different from a queue because there’s no sequential ordering for triggering the execution of tasks. The class will ensure that no more than max_concurrency tasks can be active simultaneously.
The class offers two methods: put(), which can be used to add an async function for execution, and join(), which can be used to wait until all tasks are over. put() will block until the pool has room for a new task. Below is the modified send_data() function that uses it:

import functools

async def send_data():
    tasks = ConcurrentTasks(5) 
    while have_data: 
        batch = await get_batch_of data()
        await tasks.put(
            functools.partial(
                send_data_to_es, batch)
            )
        )

    # make sure we wait for all tasks to end
    await tasks.join()Read more

A couple of libraries provide a similar feature: aiometer, asyncio-pool.

Python 3.11, released just a few months ago, also introduced the TaskGroup class that can be used as a context manager to run tasks in parallel. From the documentation:

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(some_coro(...))
        task2 = tg.create_task(another_coro(...))
    print("Both tasks have completed now.")

We are not using Python 3.11 yet in production as it is quite recent, but moving to asyncio.TaskGroup going forward would make a lot of sense.

Harness your memory usage

Memory is a critical resource for a service application. The service can’t grow indefinitely in memory when dealing with a lot of data.

In connectors-python, we are building an intermediate service between a third-party service that provides some data and Elasticsearch. The data gets massaged in our pipeline and is passed along. Running many concurrent tasks that hold data to do this work means that the Python heap and its process Resident Set Size (RSS) will grow.

In the previous example, the send_data() function will hold in memory five concurrent batches of data it is planning to send to Elasticsearch. Depending on how get_batch_of_data and send_data_to_es are implemented, you might get up to three copies of the same batch simultaneously in the heap unless you pass around the same objects. 

To avoid memory bloat, we need to have an upper limit for the data the service holds. Python has a queue class that we can use to store and limit items we are dealing with, but it is not memory aware — it does not know how much memory each item uses.

You could use sys.getsizeof() on simple objects, but it won’t work for complex objects composed of other objects. It won’t recursively find out the memory size of each linked object, and the value you will get will be off.

>>> import sys
>>> sys.getsizeof(['A', 'B', []])
80
>>> sys.getsizeof(['A', 'B', ['C', 'D', 'E']])
80

To get the real size, you can use the Pympler library:

>>> from pympler import asizeof
>>> asizeof.asizeof(['A', 'B', []])
248
>>> asizeof.asizeof(['A', 'B', ['C', 'D', 'E']])
448

Calling asizeof() has a small CPU cost, but it is extremely useful to measure the real memory you are using. Based on this library, we’ve created a MemQueue class derived from asyncio.Queue, which will store the memory size of items and let you define a maximum size in memory.
This class gets the size item when put or put_nowait are called, and provides a similar locking mechanism you can find in asyncio.Queue.

class MemQueue(asyncio.Queue):
    def __init__(
        self, maxsize=0, maxmemsize=0, 
        refresh_interval=1.0, refresh_timeout=60
    ):
    	super().__init__(maxsize)
    	self.maxmemsize = maxmemsize
    	self.refresh_interval = refresh_interval
    	self.refresh_timeout = refresh_timeout

    async def put(self, item):
    	item_size = get_size(item)

    	# specific locking code (see original class in GitHub)
         
    	super().put_nowait((item_size, item))

    def put_nowait(self, item):
        item_size = get_size(item)

    	if self.full(item_size):
            raise asyncio.QueueFull
    	super().put_nowait((item_size, item))Read more

The locking mechanism is based on creating a Future object that gets enqueued in a collections.deque, and put will just sit there until a get call removes an item and unblock that future. You can find the full implementation of this class in the utils.py module in the project. On top of this, a timeout will be raised after some time if a put can’t succeed. See the implementation details in https://github.com/elastic/connectors-python/blob/main/connectors/utils.py.

The MemQueue class is almost an in-place replacement for asyncio.Queue. In the example below, it’s used in a consumer-producer pattern:

FINISHED = -1

# queue of 5 MiB max, and 1000 items max
q = MemQueue(maxsize=1000, maxmemsize=5*1024*1024)

async def pull_data():
    async for item in stream_of_data():
        await q.put(item)  # block until <5MiB or <1k items
    await q.put(FINISHED)

async def push_data():
    while True:
        Item_size, item = await q.get()
        if item == FINISHED:
            break
        push_item(item)

# running the producer and consumer
asyncio.run(asyncio.gather(pull_data(), push_data()))Read more

What do you think?

You can find all these classes in action in the connectors-python project. If you use them or have improvement ideas, please start a discussion in our issue tracker. It’s an open-source project, and contributions are welcome!



Source link

Share this:

  • Facebook
  • Twitter
  • Pinterest
  • LinkedIn

Filed Under: News Tagged With: async, building, Essential, patterns, Python, Service

Special Offers

  • The 2023 Travel Hacker Bundle ft. Rosetta Stone Lifetime Subscription for $199

    The 2023 Travel Hacker Bundle ft. Rosetta Stone Lifetime Subscription for $199
  • Apple iPad Air 2, 16GB – Silver (Refurbished: Wi-Fi Only) for $106

    Apple iPad Air 2, 16GB – Silver (Refurbished: Wi-Fi Only) for $106
  • S300 eufyCam (eufyCam 3C) 3-Cam Kit for $579

    S300 eufyCam (eufyCam 3C) 3-Cam Kit for $579
  • eufy Baby Monitor 2 (2K, Smart, Wi-Fi) for $119

    eufy Baby Monitor 2 (2K, Smart, Wi-Fi) for $119
  • eufy SpaceView Add-On Video Baby Monitor for $99

    eufy SpaceView Add-On Video Baby Monitor for $99

Reader Interactions

Leave a Reply Cancel reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Primary Sidebar

  • Facebook
  • GitHub
  • Instagram
  • Pinterest
  • Twitter
  • YouTube

More to See

The 2023 Travel Hacker Bundle ft. Rosetta Stone Lifetime Subscription for $199

Jan 30, 2023 By iHash

@insideBIGDATApodcast: ChatGPT – The Human AI Partnership

Jan 29, 2023 By iHash

Tags

* Apple Cisco computer security cyber attacks cyber crime cyber news cybersecurity Cyber Security cyber security news cyber security news today cyber security updates cyber threats cyber updates data breach data breaches google hacker hacker news Hackers hacking hacking news how to hack incident response information security iOS 7 iOS 8 iPhone Malware microsoft network security ransomware ransomware malware risk management Secure security security breaches security vulnerabilities software vulnerability the hacker news Threat update video Vulnerabilities web applications

Latest

Apple iPad Air 2, 16GB – Silver (Refurbished: Wi-Fi Only) for $106

Expires July 11, 2120 23:59 PST Buy now and get 40% off KEY FEATURES The iPad Air 2 boasts 40% faster CPU performance and 2.5 times the graphics performance when compared to its predecessor. Its 9.7″ LED-backlit Retina IPS LCD with a resolution of 2048×1536 provides richer colors, greater contrast, and sharper images for a […]

Gootkit Malware Continues to Evolve with New Components and Obfuscations

Jan 29, 2023Ravie LakshmananCyber Threat / Malware The threat actors associated with the Gootkit malware have made “notable changes” to their toolset, adding new components and obfuscations to their infection chains. Google-owned Mandiant is monitoring the activity cluster under the moniker UNC2565, noting that the usage of the malware is “exclusive to this group.” Gootkit, […]

S300 eufyCam (eufyCam 3C) 3-Cam Kit for $579

Expires January 03, 2123 19:28 PST Buy now and get 0% off KEY FEATURES See 4K Detail Day and Night 180-Day Battery Life Up to 16 TB Expandable Local Storage (Additional Storage Drive Not Included) BionicMind AI Differentiates Family and Strangers HomeBase 3 Centralize Security Management PRODUCT SPECS Resolution 4K (3840×2160)° Night Vision Infrared & […]

eufy SpaceView Add-On Video Baby Monitor for $99

Expires January 28, 2123 06:33 PST Buy now and get 0% off Sweet Dreams on the Big Screen: The large 5″ 720p video baby monitor display shows a sharp picture with 10 times more detail than ordinary 240p-display baby monitors. Long-Lasting Views: Watch your baby for up to 15 hours per chargeplenty of time to […]

Microsoft Urges Customers to Secure On-Premises Exchange Servers

Jan 28, 2023Ravie LakshmananEmail Security / Cyber Threat Microsoft is urging customers to keep their Exchange servers updated as well as take steps to bolster the environment, such as enabling Windows Extended Protection and configuring certificate-based signing of PowerShell serialization payloads. “Attackers looking to exploit unpatched Exchange servers are not going to go away,” the […]

ISC Releases Security Patches for New BIND DNS Software Vulnerabilities

Jan 28, 2023Ravie LakshmananServer Security / DNS The Internet Systems Consortium (ISC) has released patches to address multiple security vulnerabilities in the Berkeley Internet Name Domain (BIND) 9 Domain Name System (DNS) software suite that could lead to a denial-of-service (DoS) condition. “A remote attacker could exploit these vulnerabilities to potentially cause denial-of-service conditions and […]

Jailbreak

Pangu Releases Updated Jailbreak of iOS 9 Pangu9 v1.2.0

Pangu has updated its jailbreak utility for iOS 9.0 to 9.0.2 with a fix for the manage storage bug and the latest version of Cydia. Change log V1.2.0 (2015-10-27) 1. Bundle latest Cydia with new Patcyh which fixed failure to open url scheme in MobileSafari 2. Fixed the bug that “preferences -> Storage&iCloud Usage -> […]

Apple Blocks Pangu Jailbreak Exploits With Release of iOS 9.1

Apple has blocked exploits used by the Pangu Jailbreak with the release of iOS 9.1. Pangu was able to jailbreak iOS 9.0 to 9.0.2; however, in Apple’s document on the security content of iOS 9.1, PanguTeam is credited with discovering two vulnerabilities that have been patched.

Pangu Releases Updated Jailbreak of iOS 9 Pangu9 v1.1.0

  Pangu has released an update to its jailbreak utility for iOS 9 that improves its reliability and success rate.   Change log V1.1.0 (2015-10-21) 1. Improve the success rate and reliability of jailbreak program for 64bit devices 2. Optimize backup process and improve jailbreak speed, and fix an issue that leads to fail to […]

Activator 1.9.6 Released With Support for iOS 9, 3D Touch

  Ryan Petrich has released Activator 1.9.6, an update to the centralized gesture, button, and shortcut manager, that brings support for iOS 9 and 3D Touch.

Copyright iHash.eu © 2023
We use cookies on this website. By using this site, you agree that we may store and access cookies on your device. Accept Read More
Privacy & Cookies Policy

Privacy Overview

This website uses cookies to improve your experience while you navigate through the website. Out of these, the cookies that are categorized as necessary are stored on your browser as they are essential for the working of basic functionalities of the website. We also use third-party cookies that help us analyze and understand how you use this website. These cookies will be stored in your browser only with your consent. You also have the option to opt-out of these cookies. But opting out of some of these cookies may affect your browsing experience.
Necessary
Always Enabled
Necessary cookies are absolutely essential for the website to function properly. This category only includes cookies that ensures basic functionalities and security features of the website. These cookies do not store any personal information.
Non-necessary
Any cookies that may not be particularly necessary for the website to function and is used specifically to collect user personal data via analytics, ads, other embedded contents are termed as non-necessary cookies. It is mandatory to procure user consent prior to running these cookies on your website.
SAVE & ACCEPT