Skip to content

Commit 41d5a1e

Browse files
authored
Merge pull request #7 from ringoldsdev/feat/20250722/loop-method
feat: loop method
2 parents 645b512 + 44f930d commit 41d5a1e

File tree

2 files changed

+138
-0
lines changed

2 files changed

+138
-0
lines changed

laygo/transformers/transformer.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,54 @@ def apply[T](self, t: Callable[[Self], "Transformer[In, T]"]) -> "Transformer[In
176176
"""Apply another pipeline to the current one."""
177177
return t(self)
178178

179+
def loop(
180+
self,
181+
loop_transformer: "Transformer[Out, Out]",
182+
condition: Callable[[list[Out]], bool] | Callable[[list[Out], PipelineContext], bool],
183+
max_iterations: int | None = None,
184+
) -> "Transformer[In, Out]":
185+
"""
186+
Repeatedly applies a transformer to each chunk until a condition is met.
187+
188+
The loop continues as long as the `condition` function returns `True` and
189+
the number of iterations has not reached `max_iterations`. The provided
190+
`loop_transformer` must take a chunk of a certain type and return a chunk
191+
of the same type.
192+
193+
Args:
194+
loop_transformer: The `Transformer` to apply in each iteration. Its
195+
input and output types must match the current pipeline's
196+
output type (`Transformer[Out, Out]`).
197+
condition: A function that takes the current chunk (and optionally
198+
the `PipelineContext`) and returns `True` to continue the
199+
loop, or `False` to stop.
200+
max_iterations: An optional integer to limit the number of repetitions
201+
and prevent infinite loops.
202+
203+
Returns:
204+
The transformer instance for method chaining.
205+
"""
206+
looped_func = loop_transformer.transformer
207+
condition_is_context_aware = is_context_aware(condition)
208+
209+
def operation(chunk: list[Out], ctx: PipelineContext) -> list[Out]:
210+
condition_checker = ( # noqa: E731
211+
lambda current_chunk: condition(current_chunk, ctx) if condition_is_context_aware else condition(current_chunk) # type: ignore
212+
)
213+
214+
current_chunk = chunk
215+
216+
iterations = 0
217+
218+
# The loop now uses the single `condition_checker` function.
219+
while (max_iterations is None or iterations < max_iterations) and condition_checker(current_chunk): # type: ignore
220+
current_chunk = looped_func(current_chunk, ctx)
221+
iterations += 1
222+
223+
return current_chunk
224+
225+
return self._pipe(operation)
226+
179227
def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]:
180228
"""
181229
Executes the transformer on a data source.

tests/test_transformer.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,50 @@ def test_tap_side_effects(self):
5757
assert result == [1, 2, 3] # Data unchanged
5858
assert side_effects == [1, 2, 3] # Side effect applied
5959

60+
def test_loop_basic_operation(self):
61+
"""Test loop applies transformer repeatedly until condition is met."""
62+
# Create a loop transformer that adds 1 to each element
63+
increment_transformer = createTransformer(int).map(lambda x: x + 1)
64+
65+
# Continue looping while any element is less than 5
66+
def condition(chunk):
67+
return any(x < 5 for x in chunk)
68+
69+
transformer = createTransformer(int).loop(increment_transformer, condition, max_iterations=10)
70+
result = list(transformer([1, 2, 3]))
71+
72+
# Should increment until all elements are >= 5: [1,2,3] -> [2,3,4] -> [3,4,5] -> [4,5,6] -> [5,6,7]
73+
assert result == [5, 6, 7]
74+
75+
def test_loop_with_max_iterations(self):
76+
"""Test loop respects max_iterations limit."""
77+
# Create a loop transformer that adds 1 to each element
78+
increment_transformer = createTransformer(int).map(lambda x: x + 1)
79+
80+
# Condition that would normally continue indefinitely
81+
def always_true_condition(chunk):
82+
return True
83+
84+
transformer = createTransformer(int).loop(increment_transformer, always_true_condition, max_iterations=3)
85+
result = list(transformer([1, 2, 3]))
86+
87+
# Should stop after 3 iterations: [1,2,3] -> [2,3,4] -> [3,4,5] -> [4,5,6]
88+
assert result == [4, 5, 6]
89+
90+
def test_loop_no_iterations(self):
91+
"""Test loop when condition is false from the start."""
92+
increment_transformer = createTransformer(int).map(lambda x: x + 1)
93+
94+
# Condition that's immediately false
95+
def exit_immediately(chunk):
96+
return False
97+
98+
transformer = createTransformer(int).loop(increment_transformer, exit_immediately)
99+
result = list(transformer([1, 2, 3]))
100+
101+
# Should not iterate at all
102+
assert result == [1, 2, 3]
103+
60104

61105
class TestTransformerContextSupport:
62106
"""Test context-aware transformer operations."""
@@ -139,6 +183,52 @@ def test_tap_with_transformer_and_context(self):
139183
# Side effects: [11,12,13] -> [55,60,65] -> ["processed:55", "processed:60", "processed:65"]
140184
assert side_effects == ["processed:55", "processed:60", "processed:65"]
141185

186+
def test_loop_with_context(self):
187+
"""Test loop with context-aware condition and transformer."""
188+
side_effects = []
189+
context = PipelineContext({"target_sum": 15, "increment": 2})
190+
191+
# Create a context-aware loop transformer that uses context increment
192+
loop_transformer = (
193+
createTransformer(int)
194+
.map(lambda x, ctx: x + ctx["increment"]) # Use context increment
195+
.tap(lambda x, ctx: side_effects.append(f"iteration:{x}")) # Log each iteration
196+
)
197+
198+
# Context-aware condition: continue while sum of chunk is less than target_sum
199+
def condition_with_context(chunk, ctx):
200+
return sum(chunk) < ctx["target_sum"]
201+
202+
main_transformer = createTransformer(int).loop(loop_transformer, condition_with_context, max_iterations=10)
203+
204+
result = list(main_transformer([1, 2, 3], context))
205+
206+
# Initial: [1,2,3] sum=6 < 15, continue
207+
# After 1st: [3,4,5] sum=12 < 15, continue
208+
# After 2nd: [5,6,7] sum=18 >= 15, stop
209+
assert result == [5, 6, 7]
210+
211+
# Should have logged both iterations
212+
assert side_effects == ["iteration:3", "iteration:4", "iteration:5", "iteration:5", "iteration:6", "iteration:7"]
213+
214+
def test_loop_with_context_and_side_effects(self):
215+
"""Test loop with context-aware condition that reads context data."""
216+
context = PipelineContext({"max_value": 20, "increment": 3})
217+
218+
# Simple loop transformer that uses context increment
219+
loop_transformer = createTransformer(int).map(lambda x, ctx: x + ctx["increment"])
220+
221+
# Context-aware condition: continue while max value in chunk is less than context max_value
222+
def condition_with_context(chunk, ctx):
223+
return max(chunk) < ctx["max_value"]
224+
225+
main_transformer = createTransformer(int).loop(loop_transformer, condition_with_context, max_iterations=10)
226+
227+
result = list(main_transformer([5, 8], context))
228+
229+
# [5,8] -> [8,11] -> [11,14] -> [14,17] -> [17,20] (stop because max(17,20) >= 20)
230+
assert result == [17, 20]
231+
142232

143233
class TestTransformerChaining:
144234
"""Test chaining multiple transformer operations."""

0 commit comments

Comments
 (0)