Skip to content

Incorrect SearchContext iteration with asyncio.TaskGroup in back-end #966

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
jorgenherje opened this issue May 8, 2025 · 1 comment · May be fixed by #974
Open

Incorrect SearchContext iteration with asyncio.TaskGroup in back-end #966

jorgenherje opened this issue May 8, 2025 · 1 comment · May be fixed by #974
Assignees
Labels
bug Something isn't working small issue

Comments

@jorgenherje
Copy link
Collaborator

In back-end we perform multiple index based for loop and getitem_async() for an instance of SearchContext.

With the Sumo SearchContext we want to get items in parallel. This is done by sc.length_async and then iterating over the range. We then get the item using getitem_async. The issue lies in the fact that SeachContext has its internal sc._hits array, which is not initialized upfront with this code, thereby the parallel code calls multiple initializations of sc._hits when ran in parallel.

TODO: Replace loop using sc.uuids_async and then loop over the uuids and call sc.get_object_async instead of sc.getitem_async.

async def get_seismic_cube_meta_list_async(self) -> List[SeismicCubeMeta]:
    seismic_context = self._ensemble_context.cubes.filter(
        realization=this._realization,
    )
    
    length_cubes = await seismic_context.length_async()
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(_get_seismic_cube_meta_async(seismic_context, i)) for i in range(length_cubes)]
    cube_meta_arr: list[SeismicCubeMeta] = [task.result() for task in tasks]
    return cube_meta_arr
    
    
async def _get_seismic_cube_meta_async(search_context: SearchContext, item_no: int) -> SeismicCubeMeta:
    seismic_cube = await search_context.getitem_async(item_no)
    ...
@jorgenherje jorgenherje self-assigned this May 8, 2025
@jorgenherje jorgenherje added bug Something isn't working small issue labels May 8, 2025
@jorgenherje
Copy link
Collaborator Author

After discussions the usage of asyncio.TaskGroup() will be replaced with regular async for-loop over the SearchContext objects: async for obj in search_context.

As of now we will be using the regular async for obj in sc-loop rather than sc.uuids_async and sc.get_object_async(). Using the sc.uuids_async and iterating over the uuids and use sc.get_object_async() as this will perform one await to get the uuids, but thereafter run n number of requests to sumo in parallel.

This is because the SearchContext async iterator performs pre-fetching of up to 100 objects when iterating. Thereby the first iteration of for-loop will trigger fetch, and the loop has to await. But when the result is retrieved, up to 100 objects are fetched using one post towards Sumo. If the number of objects is more than 100, the second pre-fetch will ofcourse have to await, and thereby increase the total time. However, this might be better than running over 100 parallel requests towards Sumo?

Original candidate was:

sc = self._ensemble_context.cubes.filter(realization=this._realization)

uuids = await seismic_context.uuids_async # This would initialize SearchContext._hints prior to parallel loop
async with asyncio.TaskGroup() as tg:
    # N number of parallel requests to Sumo
    tasks = [tg.create_task(_get_object_async(sc, uuid)) for uuid in uuids]
results = [task.result() for task in tasks]

async _get_object_attribute_async(sc: SearchContext, uuid: str):
    obj = await sc.get_object_async(uuid)
    return obj.attr

New solution:

sc = self._ensemble_context.cubes.filter(realization=this._realization)
obj_attr_list = []

# Async for-loop utilizes __anext__ which will pre-fetch 100 objects on first iteration, and have i cache
async for obj in sc: 
   obj_attr_list.append(obj.attr)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working small issue
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant