This blog post presents a simple implementation of Bluebird’s Promise.map function in Python. I tend to code most of my bots in NodeJS since I find it really convenient to manage asynchronous tasks (HTTP requests here) using await/async. A NodeJS library I heavily rely on is Bluebird, and in particular, its Promise.map function. Promise.map provides a simple mechanism to manage a pool of asynchronous tasks (promises) so that, at a given time, there are at most N tasks running (where N can be passed as an argument).
I was interested to know if there was an equivalent in Python.
After a quick search, I found: concurrent.futures.ThreadPoolExecutor.
However, concurrent.futures.ThreadPoolExecutor.map seems to take a synchronous function as input, which is not convenient if your code leverages asynchronous code, e.g. if you use aiohttp to make HTTP requests.
I didn’t search further and used this as a pretext to get more familiar with Python asyncio and to code an equivalent of Bluebird’s Promise.map in Python.
Warning: I don’t think my implementation is really Pythonic, it must just be seen as an exercise ;)
Promise.map: Example with aiohttp
To showcase our implementation of Promise.map in Python, we use an example of a parallel crawler, i.e. a crawler that fetches several pages in parallel.
First, we define an asynchronous get_url function to fetch the content of a single URL using aiohttp and extract all the links of the blog posts present on the page.
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def get_url(url):
    connector = aiohttp.TCPConnector()
    session = aiohttp.ClientSession(connector=connector)
    headers = {
        "accept-language": "en-US;q=0.8,en;q=0.7",
        "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.96 Safari/537.36"
        # Add any header you want
    }
    print("Crawling: {}".format(url))
    results = []
    resp = await session.get(url, headers=headers)
    print("Crawled: {}".format(url))
    html_content = await resp.text()
    await connector.close()
    soup = BeautifulSoup(html_content, 'html.parser')
    for link in soup.select('a.storylink'):
        results.append('{};{}'.format(link.get('href'), link.text))
    return results
Now that we have our get_url function to asynchronously fetch the list of blog posts on an URL, we define the promise_map function that mimics the behavior of Bluebird’s Promise.map function.
It takes 3 values as input:
- values: the values you want to iterate on;
- mapper: the function you want to apply on each value of- values. Note that my implementation requires the mapper to have a single argument as a parameter;
- concurrency: the number of parallel tasks.
async def promise_map(values, mapper, concurrency):
    async def mapper_wrapper(iterator, mapper):
        # store all results of a given coroutine in a list
        res_coroutine = []
        
        # Iterate on the iterator
        # Note that this iterator is shared among all coroutines
        # so that each element of "values" is handled only once
        for elt in iterator:
            # Call the "mapper" function and wait for it to finish
            res_mapper = await mapper(elt)
            # Store the result of the function into res_coroutine
            res_coroutine.append(res_mapper)
        # When there's no more value to iterate on, the coroutine
        # return all its results
        return res_coroutine
    coroutines = []
    # get an iteror on the values we want to iterate on
    # the iterator will be shared among the "concurrency" workers
    values_iterator = iter(values)
    # Spawn "concurrency" coroutines
    for idx_concurrency in range(0, concurrency):
        # we store all returned coroutines in a list 
        coroutines.append(mapper_wrapper(values_iterator, mapper))
    # Once all coroutines have been spawned, we await them
    results = await asyncio.gather(*coroutines)
    # Each coroutine returns a list
    # We flatten the list of lists to obtain a list of raw value element
    res_coroutines = []
    for res_coroutine in results:
        for v in res_coroutine:
            res_coroutines.append(v)
    return res_coroutines
Finally, we define an asynchronous main function and apply our Pythonic implementation of Promise.map on a set of Hacker news URLs.
async def main():
    # Number of pages we want to iterate on
    num_pages = 15
    # Number of concurrent requests
    concurrency = 4
    urls = ['https://news.ycombinator.com/news?p={}'.format(idx_page) for idx_page in range(1, num_pages)]
    # Launch  "concucrrency" coroutines (here 4) to fetch blog post titles present on Hacker news
    res = await promise_map(urls, get_url, concurrency)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
You can find the code of this bot on Github in my Bots zoo repository. You’ll also find other examples of bots in Python/NodeJS, as well as lists of user-agents and HTTP headers:
- Selenium, Puppeteer, Playwright bots;
- Parallel/sequential bots using simple NodeJS HTTP requests.
