mirror of
https://github.com/home-assistant/core.git
synced 2025-07-29 18:28:14 +02:00
Move recorder chunk utils to shared collection utils (#118065)
This commit is contained in:
@ -2,13 +2,11 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable, Collection, Generator, Iterable, Sequence
|
||||
from collections.abc import Callable, Generator, Sequence
|
||||
import contextlib
|
||||
from contextlib import contextmanager
|
||||
from datetime import date, datetime, timedelta
|
||||
import functools
|
||||
from functools import partial
|
||||
from itertools import islice
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
@ -859,36 +857,6 @@ def resolve_period(
|
||||
return (start_time, end_time)
|
||||
|
||||
|
||||
def take(take_num: int, iterable: Iterable) -> list[Any]:
|
||||
"""Return first n items of the iterable as a list.
|
||||
|
||||
From itertools recipes
|
||||
"""
|
||||
return list(islice(iterable, take_num))
|
||||
|
||||
|
||||
def chunked(iterable: Iterable, chunked_num: int) -> Iterable[Any]:
|
||||
"""Break *iterable* into lists of length *n*.
|
||||
|
||||
From more-itertools
|
||||
"""
|
||||
return iter(partial(take, chunked_num, iter(iterable)), [])
|
||||
|
||||
|
||||
def chunked_or_all(iterable: Collection[Any], chunked_num: int) -> Iterable[Any]:
|
||||
"""Break *collection* into iterables of length *n*.
|
||||
|
||||
Returns the collection if its length is less than *n*.
|
||||
|
||||
Unlike chunked, this function requires a collection so it can
|
||||
determine the length of the collection and return the collection
|
||||
if it is less than *n*.
|
||||
"""
|
||||
if len(iterable) <= chunked_num:
|
||||
return (iterable,)
|
||||
return chunked(iterable, chunked_num)
|
||||
|
||||
|
||||
def get_index_by_name(session: Session, table_name: str, index_name: str) -> str | None:
|
||||
"""Get an index by name."""
|
||||
connection = session.connection()
|
||||
|
Reference in New Issue
Block a user