Source code for rattle.api

# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

import logging
import multiprocessing
import os
import sys
from collections.abc import Collection, Generator, Iterable
from dataclasses import dataclass, field, replace
from functools import partial
from multiprocessing.context import BaseContext
from pathlib import Path
from typing import Any, cast

import trailrunner
from libcst import ParserSyntaxError

from .cache import ResultCache
from .config import collect_rules, generate_config
from .console import echo, echo_color_precomputed_diff
from .engine import LintRunner, diff_module
from .format import format_module, format_paths
from .ftypes import (
    STDIN,
    Config,
    FileContent,
    LintViolation,
    Metrics,
    MetricsHook,
    Options,
    OutputFormat,
    Result,
)
from .output import render_rattle_result
from .rule import LintRule

LOG = logging.getLogger(__name__)
ConfiguredPath = tuple[Path, Config, bool]
ConfiguredPathBatch = list[ConfiguredPath]


@dataclass(frozen=True)
class ConfiguredPathBatchResult:
    results: list[Result]
    deferred_format_paths: list[Path]
    metrics: list[Metrics] = field(default_factory=list)


@dataclass
class ConfiguredFileCacheSession:
    path: Path
    config: Config
    stat: os.stat_result
    rules: Collection[LintRule]
    autofix: bool = False
    include_diff: bool = False
    allow_cached_dirty_results: bool = False
    options: Options | None = None
    explicit_path: bool = False
    cache: ResultCache | None = None
    cache_key: str = ""
    cached_results: list[Result] | None = None
    cached_autofix_rule_names: set[str] = field(default_factory=set)
    cached_passthrough: list[Result] = field(default_factory=list)

    @classmethod
    def from_environment(
        cls,
        *,
        path: Path,
        config: Config,
        stat: os.stat_result,
        rules: Collection[LintRule],
        autofix: bool,
        include_diff: bool,
        allow_cached_dirty_results: bool,
        options: Options | None,
        explicit_path: bool,
    ) -> "ConfiguredFileCacheSession":
        return cls(
            path=path,
            config=config,
            stat=stat,
            rules=rules,
            autofix=autofix,
            include_diff=include_diff,
            allow_cached_dirty_results=allow_cached_dirty_results,
            options=options,
            explicit_path=explicit_path,
            cache=ResultCache.from_environment(),
        )

    def read_complete_entry(self) -> bool:
        if self.cache is None:
            return False

        self.cache_key = self.cache.result_key(
            self.path,
            self.stat,
            self.config,
            include_diff=self.include_diff,
        )
        self.cached_results, self.cached_autofix_rule_names, cached_result_is_complete = (
            self.cache.read_configured_file(
                self.cache_key,
                self.stat,
                path=self.path,
                config=self.config,
                rules=self.rules,
                autofix=self.autofix,
                allow_cached_dirty_results=self.allow_cached_dirty_results,
            )
        )
        return cached_result_is_complete

    def cached_results_or_empty(self) -> list[Result]:
        return self.cached_results or []

    def prepare_autofix_passthrough(self) -> list[Result]:
        if not self.cached_autofix_rule_names:
            return []

        self.cached_passthrough = [
            result
            for result in self.cached_results or ()
            if result.violation is not None
            and result.violation.rule_name not in self.cached_autofix_rule_names
        ]
        self.rules = [rule for rule in self.rules if rule.name in self.cached_autofix_rule_names]
        return self.cached_passthrough

    def write_result(
        self,
        content: FileContent,
        clean: bool,
        cacheable: bool,
        cache_violations: list[LintViolation],
    ) -> None:
        if self.cache is None:
            return

        if clean and not self.cached_passthrough:
            self.cache.write_result(self.cache_key, self.stat, rules=self.rules)
            self.cache.write_clean_status(
                self.path,
                self.stat,
                options=self.options,
                explicit_path=self.explicit_path,
                include_diff=self.include_diff,
                rules=self.rules,
            )
            return

        if cacheable and cache_violations:
            self.cache.write_result(
                self.cache_key,
                self.stat,
                source=content,
                violations=cache_violations,
                rules=self.rules,
            )


@dataclass
class ConfiguredFileRun:
    path: Path
    config: Config
    autofix: bool = False
    include_diff: bool = False
    allow_cached_dirty_results: bool = False
    deferred_format_paths: list[Path] | None = None
    options: Options | None = None
    explicit_path: bool = False
    metrics_hook: MetricsHook | None = None
    content: FileContent | None = None
    stat: os.stat_result | None = None
    rules: Collection[LintRule] = ()
    cache_session: ConfiguredFileCacheSession | None = None

    def run(self) -> Generator[Result, bool, None]:
        self.path = self.path.resolve()

        try:
            self.stat = self.path.stat()
            self.rules = collect_rules(self.config)
            if self.metrics_hook is None:
                self.cache_session = ConfiguredFileCacheSession.from_environment(
                    path=self.path,
                    config=self.config,
                    stat=self.stat,
                    rules=self.rules,
                    autofix=self.autofix,
                    include_diff=self.include_diff,
                    allow_cached_dirty_results=self.allow_cached_dirty_results,
                    options=self.options,
                    explicit_path=self.explicit_path,
                )
                if self.cache_session.read_complete_entry():
                    yield from self.cache_session.cached_results_or_empty()
                    return

                yield from self.cache_session.prepare_autofix_passthrough()
                self.rules = self.cache_session.rules

            if not self.rules:
                self.content = self.path.read_bytes()
                yield Result(
                    self.path,
                    violation=None,
                    source=self.content,
                    config=self.config,
                )
                return

            self.content = self.path.read_bytes()
            lint_config = replace(self.config, formatter=None) if self.defer_format else self.config
            runner = rattle_bytes(
                self.path,
                self.content,
                config=lint_config,
                autofix=self.autofix,
                include_diff=self.include_diff,
                rules=self.rules,
                metrics_hook=self.metrics_hook,
            )
            updated, clean, cacheable, cache_violations = yield from _drive_rattle_bytes(
                runner,
                cacheable=not self.autofix,
            )
            self._store_result(updated, clean, cacheable, cache_violations)

        except Exception as error:  # noqa: BLE001 - file boundary
            LOG.debug("Exception while rattle_configured_file", exc_info=error)
            yield Result.from_exception(
                self.path,
                error,
                source=self.content,
                config=self.config,
            )

    @property
    def defer_format(self) -> bool:
        return (
            self.deferred_format_paths is not None
            and self.autofix
            and self.config.formatter == "ruff"
        )

    def _store_result(
        self,
        updated: FileContent | None,
        clean: bool,
        cacheable: bool,
        cache_violations: list[LintViolation],
    ) -> None:
        assert self.stat is not None
        if updated and updated != self.content:
            LOG.info("%s: writing changes to file", self.path)
            self.path.write_bytes(updated)
            if self.defer_format and self.deferred_format_paths is not None:
                self.deferred_format_paths.append(self.path)
            return

        if self.content is not None and self.cache_session is not None:
            self.cache_session.write_result(
                self.content,
                clean=clean,
                cacheable=cacheable,
                cache_violations=cache_violations,
            )


@dataclass
class PathLintRun:
    paths: Iterable[Path]
    autofix: bool = False
    include_diff: bool = False
    allow_cached_dirty_results: bool = False
    options: Options | None = None
    parallel: bool = True
    metrics_hook: MetricsHook | None = None
    expanded_paths: list[tuple[Path, bool]] = field(default_factory=list)
    included_paths: list[ConfiguredPath] = field(default_factory=list)
    deferred_format_paths: list[Path] = field(default_factory=list)

    def run(self) -> Generator[Result, bool, None]:
        paths = tuple(self.paths)
        if not paths:
            return

        self.expanded_paths, is_stdin, stdin_path = _expand_paths(paths)
        if is_stdin:
            yield from rattle_stdin(
                stdin_path,
                autofix=self.autofix,
                include_diff=self.include_diff,
                options=self.options,
                metrics_hook=self.metrics_hook,
            )
            return

        self.included_paths = _configured_paths(
            self._pending_paths(),
            options=self.options,
        )
        if len(self.included_paths) == 1 or not self.parallel:
            yield from self.run_serial()
            return

        yield from self.run_parallel()

    def run_configured_group(self, group: list[ConfiguredPath]) -> Generator[Result, bool, None]:
        self.included_paths = group
        yield from self.run_parallel()

    def run_serial(self) -> Generator[Result, bool, None]:
        for path, config, explicit_path in self.included_paths:
            yield from rattle_configured_file(
                path,
                config=config,
                autofix=self.autofix,
                include_diff=self.include_diff,
                allow_cached_dirty_results=self.allow_cached_dirty_results,
                deferred_format_paths=self.deferred_format_paths,
                options=self.options,
                explicit_path=explicit_path,
                metrics_hook=self.metrics_hook,
            )
        self._format_deferred_paths()

    def run_parallel(self) -> Generator[Result, bool, None]:
        concurrency = self._concurrency()
        if concurrency <= 1:
            yield from self.run_serial()
            return

        fn = partial(
            _rattle_configured_file_batch_wrapper,
            autofix=self.autofix,
            include_diff=self.include_diff,
            allow_cached_dirty_results=self.allow_cached_dirty_results,
            options=self.options,
            collect_metrics=self.metrics_hook is not None,
        )
        context = _process_context()
        if context is not None:
            _preload_rules_for_fork(self.included_paths)
        batches = _configured_path_batches(self.included_paths, concurrency=concurrency)
        runner = trailrunner.Trailrunner(concurrency=concurrency, context=context)
        batch_results = cast(
            Iterable[tuple[ConfiguredPathBatch, ConfiguredPathBatchResult | list[Result]]],
            cast(Any, runner).run_iter(batches, fn),
        )
        for _, batch_result in batch_results:
            if isinstance(batch_result, ConfiguredPathBatchResult):
                self.deferred_format_paths.extend(batch_result.deferred_format_paths)
                if self.metrics_hook is not None:
                    for metrics in batch_result.metrics:
                        self.metrics_hook(metrics)
                yield from batch_result.results
            else:
                yield from batch_result

        self._format_deferred_paths()

    def _pending_paths(self) -> list[tuple[Path, bool]]:
        cache = ResultCache.from_environment() if self.metrics_hook is None else None
        if cache is None:
            return [(path.resolve(), explicit_path) for path, explicit_path in self.expanded_paths]

        return cache.collect_pending_paths(
            self.expanded_paths,
            include_diff=self.include_diff,
            options=self.options,
        )

    def _concurrency(self) -> int:
        configured_jobs = self.options.jobs if self.options is not None else None
        worker_count = (
            configured_jobs
            if configured_jobs is not None
            else _default_worker_count(
                file_count=len(self.included_paths),
                total_bytes=_configured_path_total_bytes(self.included_paths),
            )
        )
        return min(len(self.included_paths), worker_count)

    def _format_deferred_paths(self) -> None:
        _format_deferred_paths(self.deferred_format_paths)


def _available_cpu_count() -> int:
    try:
        return len(os.sched_getaffinity(0))
    except AttributeError:
        return os.cpu_count() or 1


def _default_worker_count(
    *,
    file_count: int,
    total_bytes: int | None = None,
    cpu_count: int | None = None,
) -> int:
    """
    Pick a fast default worker count without saturating the machine.

    Process startup and rule/config import costs dominate small lint runs, so the
    automatic default intentionally stays conservative unless there is enough
    work to amortize additional workers.
    """
    available = max(1, cpu_count if cpu_count is not None else _available_cpu_count())
    if file_count < 8:
        return 1

    if total_bytes is not None and total_bytes < 2_000_000:
        return min(4, available, file_count)

    return min(8, available, max(1, file_count // 8))


def _configured_path_total_bytes(group: Collection[ConfiguredPath]) -> int | None:
    total = 0
    try:
        for path, _config, _explicit_path in group:
            total += path.stat().st_size
    except OSError:
        return None
    return total


def _configured_path_batches(
    group: list[ConfiguredPath],
    *,
    concurrency: int,
) -> list[ConfiguredPathBatch]:
    chunk_size = max(1, len(group) // max(1, concurrency * 4))
    return [group[index : index + chunk_size] for index in range(0, len(group), chunk_size)]


def _process_context() -> BaseContext | None:
    if os.name != "posix":
        return None
    try:
        return multiprocessing.get_context("fork")
    except ValueError:
        return None


def _display_path(path: Path) -> Path:
    try:
        return path.relative_to(Path.cwd())
    except ValueError:
        return path


def _drive_rattle_bytes(
    runner: Generator[Result, bool, FileContent | None],
    *,
    cacheable: bool,
) -> Generator[Result, bool, tuple[FileContent | None, bool, bool, list[LintViolation]]]:
    clean = True
    cache_violations: list[LintViolation] = []

    while True:
        try:
            result = next(runner)
        except StopIteration as stop:
            return stop.value, clean, cacheable, cache_violations

        while True:
            if result.violation or result.error:
                clean = False
            if result.error:
                cacheable = False
            if result.violation:
                cache_violations.append(result.violation)
            send_value = yield result
            try:
                result = runner.send(bool(send_value))
            except StopIteration as stop:
                return stop.value, clean, cacheable, cache_violations


def _print_rattle_result(
    result: Result, *, path: Path, show_diff: bool, stderr: bool, brief: bool
) -> bool:
    rendered = render_rattle_result(result, path=path, color=True, brief=brief)
    if rendered is None:
        return False

    echo(rendered, err=stderr)
    if show_diff and result.violation and result.violation.diff:
        echo_color_precomputed_diff(result.violation.diff, err=stderr)
    if not brief or (show_diff and result.violation and result.violation.diff):
        echo(err=stderr)
    return True


def _print_violation_result(
    result: Result,
    *,
    path: Path,
    show_diff: bool,
    stderr: bool,
    output_format: OutputFormat,
    output_template: str,
    brief: bool,
) -> bool:
    violation = result.violation
    assert violation is not None
    assert violation.range is not None

    rule_name = violation.rule_name
    start_line = violation.range.start.line
    start_col = violation.range.start.column
    message = violation.message
    if violation.autofixable:
        message += " (has autofix)"

    if output_format == OutputFormat.rattle:
        if _print_rattle_result(result, path=path, show_diff=show_diff, stderr=stderr, brief=brief):
            return True
        raise NotImplementedError("missing rattle renderer for lint violation")

    if output_format == OutputFormat.vscode:
        line = f"{path}:{start_line}:{start_col} {rule_name}: {message}"
    elif output_format == OutputFormat.custom:
        line = output_template.format(
            message=message,
            path=path,
            result=result,
            rule_name=rule_name,
            start_col=start_col,
            start_line=start_line,
        )
    else:
        raise NotImplementedError(f"output-format = {output_format!r}")

    echo(line, color="yellow", err=stderr)
    if show_diff and violation.diff:
        echo_color_precomputed_diff(violation.diff, err=stderr)
    return True


def _print_error_result(
    result: Result,
    *,
    path: Path,
    show_diff: bool,
    stderr: bool,
    output_format: OutputFormat,
    brief: bool,
) -> bool:
    error, tb = result.error or (None, "")
    assert error is not None

    if output_format == OutputFormat.rattle and isinstance(error, ParserSyntaxError):
        if _print_rattle_result(result, path=path, show_diff=show_diff, stderr=stderr, brief=brief):
            return True
        raise NotImplementedError("missing rattle renderer for syntax error")

    echo(f"{path}: EXCEPTION: {error}", color="red", err=stderr)
    echo(tb.strip(), err=stderr)
    return True


def _expand_paths(paths: Iterable[Path]) -> tuple[list[tuple[Path, bool]], bool, Path]:
    expanded_paths: list[tuple[Path, bool]] = []
    is_stdin = False
    stdin_path = Path("stdin")

    for index, path in enumerate(paths):
        if path == STDIN:
            if index == 0:
                is_stdin = True
            else:
                LOG.warning("Cannot mix stdin ('-') with normal paths, ignoring")
        elif is_stdin:
            if index == 1:
                stdin_path = path
            else:
                raise ValueError("too many stdin paths")
        else:
            is_explicit = path.is_file()
            if is_explicit:
                expanded_paths.append((path, True))
            else:
                expanded_paths.extend(
                    (expanded_path, False) for expanded_path in trailrunner.walk(path)
                )

    return expanded_paths, is_stdin, stdin_path






def _rattle_bytes_autofix_with_diff(
    path: Path,
    content: FileContent,
    *,
    config: Config,
    rules: Collection[LintRule],
    metrics_hook: MetricsHook | None,
) -> Generator[Result, bool, FileContent | None]:
    runner = LintRunner(path, content)
    violations = list(
        runner.collect_violations(
            rules,
            config,
            metrics_hook,
            include_diff=False,
        )
    )
    if not violations:
        yield Result(path, violation=None, source=content, config=config)
        return None

    pending_fixes = [violation for violation in violations if violation.replacement]
    updated = runner.apply_replacements(pending_fixes) if pending_fixes else None
    aggregate_diff = diff_module(path, runner.module, updated) if updated else ""
    diff_consumed = False
    for violation in violations:
        if aggregate_diff and violation.replacement and not diff_consumed:
            violation = replace(violation, diff=aggregate_diff)
            diff_consumed = True
        yield Result(path, violation, source=content, config=config)

    if updated:
        return format_module(updated, path, config)
    return None


[docs] def rattle_bytes( path: Path, content: FileContent, *, config: Config, autofix: bool = False, include_diff: bool = False, rules: Collection[LintRule] | None = None, metrics_hook: MetricsHook | None = None, ) -> Generator[Result, bool, FileContent | None]: """ Lint raw bytes content representing a single path, using the given configuration. Yields :class:`Result` objects for each lint error or exception found, or a single empty result if the file is clean. A file is considered clean if no lint errors or no rules are enabled for the given path. Returns the final :class:`FileContent` including any fixes applied. Use :func:`capture` to more easily capture return value after iterating through violations. Use ``generator.send(...)`` with a boolean value to apply individual fixes for each violation. If ``autofix`` is ``True``, all violations with replacements will be applied automatically, even if ``False`` is sent back to the generator. """ try: rules = rules if rules is not None else collect_rules(config) if not rules: yield Result(path, violation=None, source=content, config=config) return None if autofix and include_diff: return ( yield from _rattle_bytes_autofix_with_diff( path, content, config=config, rules=rules, metrics_hook=metrics_hook, ) ) runner = LintRunner(path, content) pending_fixes: list[LintViolation] = [] clean = True for violation in runner.collect_violations( rules, config, metrics_hook, include_diff=include_diff ): clean = False fix = yield Result(path, violation, source=content, config=config) if fix or autofix: pending_fixes.append(violation) if clean: yield Result(path, violation=None, source=content, config=config) if pending_fixes: updated = runner.apply_replacements(pending_fixes) return format_module(updated, path, config) except Exception as error: # noqa: BLE001 - result conversion boundary # TODO: this is not the right place to catch errors LOG.debug("Exception while linting", exc_info=error) yield Result.from_exception(path, error, source=content, config=config) return None
def rattle_stdin( path: Path, *, autofix: bool = False, include_diff: bool = False, options: Options | None = None, metrics_hook: MetricsHook | None = None, ) -> Generator[Result, bool, None]: """ Wrapper around :func:`rattle_bytes` for formatting content from STDIN. The resulting fixed content will be printed to STDOUT. Requires passing a path that represents the filesystem location matching the contents to be linted. This will be used to resolve the ``pyproject.toml`` configuration. """ path = path.resolve() content: FileContent | None = None config: Config | None = None try: stdin_content = sys.stdin.buffer.read() content = stdin_content config = generate_config(path, options=options, explicit_path=True) if config.excluded: return updated = yield from rattle_bytes( path, stdin_content, config=config, autofix=autofix, include_diff=include_diff, metrics_hook=metrics_hook, ) if autofix: sys.stdout.buffer.write(updated or stdin_content) except Exception as error: # noqa: BLE001 - stdin boundary LOG.debug("Exception while rattle_stdin", exc_info=error) yield Result.from_exception(path, error, source=content, config=config)
[docs] def rattle_file( path: Path, *, autofix: bool = False, include_diff: bool = False, options: Options | None = None, explicit_path: bool = False, metrics_hook: MetricsHook | None = None, ) -> Generator[Result, bool, None]: """ Lint a single file on disk, detecting and generating appropriate configuration. Generates a merged :ref:`configuration` based on all applicable config files. Reads file from disk as raw bytes, and uses :func:`rattle_bytes` to lint and apply any fixes to the content. Writes content back to disk if changes are detected. Yields :class:`Result` objects for each lint error or exception found, or a single empty result if the file is clean. See :func:`rattle_bytes` for semantics. """ path = path.resolve() config: Config | None = None try: config = generate_config(path, options=options, explicit_path=explicit_path) if config.excluded: return yield from rattle_configured_file( path, config=config, autofix=autofix, include_diff=include_diff, options=options, explicit_path=explicit_path, metrics_hook=metrics_hook, ) except Exception as error: # noqa: BLE001 - file boundary LOG.debug("Exception while rattle_file", exc_info=error) yield Result.from_exception( path, error, config=config, )
def rattle_configured_file( path: Path, *, config: Config, autofix: bool = False, include_diff: bool = False, allow_cached_dirty_results: bool = False, deferred_format_paths: list[Path] | None = None, options: Options | None = None, explicit_path: bool = False, metrics_hook: MetricsHook | None = None, ) -> Generator[Result, bool, None]: yield from ConfiguredFileRun( path, config, autofix=autofix, include_diff=include_diff, allow_cached_dirty_results=allow_cached_dirty_results, deferred_format_paths=deferred_format_paths, options=options, explicit_path=explicit_path, metrics_hook=metrics_hook, ).run() def _rattle_file_wrapper( path: Path, *, autofix: bool = False, include_diff: bool = False, options: Options | None = None, explicit_path: bool = False, metrics_hook: MetricsHook | None = None, ) -> list[Result]: """ Wrapper because generators can't be pickled or used directly via multiprocessing TODO: replace this with some sort of queue or whatever. """ return list( rattle_file( path, autofix=autofix, include_diff=include_diff, options=options, explicit_path=explicit_path, metrics_hook=metrics_hook, ) ) def _rattle_configured_file_wrapper( item: ConfiguredPath, *, autofix: bool = False, include_diff: bool = False, allow_cached_dirty_results: bool = False, deferred_format_paths: list[Path] | None = None, options: Options | None = None, metrics_hook: MetricsHook | None = None, ) -> list[Result]: path, config, explicit_path = item return list( rattle_configured_file( path, config=config, autofix=autofix, include_diff=include_diff, allow_cached_dirty_results=allow_cached_dirty_results, deferred_format_paths=deferred_format_paths, options=options, explicit_path=explicit_path, metrics_hook=metrics_hook, ) ) def _rattle_configured_file_batch_wrapper( batch: ConfiguredPathBatch, *, autofix: bool = False, include_diff: bool = False, allow_cached_dirty_results: bool = False, options: Options | None = None, collect_metrics: bool = False, ) -> ConfiguredPathBatchResult: results: list[Result] = [] deferred_format_paths: list[Path] = [] metrics: list[Metrics] = [] metrics_hook = (lambda value: metrics.append(dict(value))) if collect_metrics else None for item in batch: results.extend( _rattle_configured_file_wrapper( item, autofix=autofix, include_diff=include_diff, allow_cached_dirty_results=allow_cached_dirty_results, deferred_format_paths=deferred_format_paths, options=options, metrics_hook=metrics_hook, ) ) return ConfiguredPathBatchResult(results, deferred_format_paths, metrics) def _preload_rules_for_fork(group: Collection[ConfiguredPath]) -> None: seen: set[tuple[object, ...]] = set() for _path, config, _explicit_path in group: key = ( config.root, config.enable_root_import, tuple(str(selector) for selector in config.enable), tuple(str(selector) for selector in config.disable), tuple( sorted( ( rule_name, tuple( sorted( ( option_name, repr(option_value), ) for option_name, option_value in options.items() ) ), ) for rule_name, options in config.options.items() ) ), config.tags, config.python_version, ) if key in seen: continue seen.add(key) collect_rules(config) def _rattle_paths_group( group: list[ConfiguredPath], *, autofix: bool, include_diff: bool, allow_cached_dirty_results: bool, options: Options | None, metrics_hook: MetricsHook | None, ) -> Generator[Result, bool, None]: yield from PathLintRun( (), autofix=autofix, include_diff=include_diff, allow_cached_dirty_results=allow_cached_dirty_results, options=options, metrics_hook=metrics_hook, ).run_configured_group(group) def _format_deferred_paths(paths: list[Path]) -> None: if paths: format_paths(paths, Config(formatter="ruff")) def _configured_paths( pending_paths: list[tuple[Path, bool]], *, options: Options | None, ) -> list[ConfiguredPath]: included_paths: list[ConfiguredPath] = [] for path, explicit_path in pending_paths: config = generate_config(path, options=options, explicit_path=explicit_path) if not config.excluded: included_paths.append((path, config, explicit_path)) return included_paths
[docs] def rattle_paths( paths: Iterable[Path], *, autofix: bool = False, include_diff: bool = False, allow_cached_dirty_results: bool = False, options: Options | None = None, parallel: bool = True, metrics_hook: MetricsHook | None = None, ) -> Generator[Result, bool, None]: """ Lint multiple files or directories, recursively expanding each path. Walks all paths given, obeying any ``.gitignore`` exclusions, finding Python source files. Lints each file found using :func:`rattle_file`, using a process pool when more than one file is being linted. Yields :class:`Result` objects for each path, lint error, or exception found. See :func:`rattle_bytes` for semantics. If the first given path is STDIN (``Path("-")``), then content will be linted from STDIN using :func:`rattle_stdin`. The fixed content will be written to STDOUT. A second path argument may be given, which represents the original content's true path name, and will be used: - to resolve the ``pyproject.toml`` configuration - when printing status messages, diffs, or errors. If no second path argument is given, it will default to "stdin" in the current working directory. Any further path names will result in a runtime error. .. note:: Currently does not support applying individual fixes when ``parallel=True``, due to limitations in the multiprocessing method in use. Setting ``parallel=False`` will enable interactive fixes. Setting ``autofix=True`` will always apply fixes automatically during linting. """ yield from PathLintRun( paths, autofix=autofix, include_diff=include_diff, allow_cached_dirty_results=allow_cached_dirty_results, options=options, parallel=parallel, metrics_hook=metrics_hook, ).run()
__all__ = [ "ConfiguredPath", "ConfiguredPathBatch", "print_result", "rattle_bytes", "rattle_configured_file", "rattle_file", "rattle_paths", "rattle_stdin", ]