@@ -27,6 +27,21 @@ class Pipeline[T]:
2727 A Pipeline provides a high-level interface for data processing by chaining
2828 transformers together. It automatically manages a multiprocessing-safe
2929 shared context that can be accessed by all transformers in the chain.
30+
31+ The Pipeline supports both streaming and batch processing patterns, with
32+ built-in support for buffering, branching (fan-out), and parallel processing.
33+
34+ Example:
35+ >>> data = [1, 2, 3, 4, 5]
36+ >>> result = (Pipeline(data)
37+ ... .transform(lambda t: t.filter(lambda x: x % 2 == 0))
38+ ... .transform(lambda t: t.map(lambda x: x * 2))
39+ ... .to_list())
40+ >>> result # [4, 8]
41+
42+ Note:
43+ Most pipeline operations consume the internal iterator, making the
44+ pipeline effectively single-use unless the data source is re-initialized.
3045 """
3146
3247 def __init__ (self , * data : Iterable [T ]) -> None :
@@ -64,14 +79,22 @@ def __del__(self) -> None:
6479 def context (self , ctx : PipelineContext ) -> "Pipeline[T]" :
6580 """Update the pipeline context and store a reference to the original context.
6681
67- When the pipeline finishes processing, the original context will be updated
68- with the final pipeline context data.
82+ The provided context will be used during pipeline execution and any
83+ modifications made by transformers will be synchronized back to the
84+ original context when the pipeline finishes processing.
6985
7086 Args:
71- ctx: The pipeline context to use for this pipeline execution.
87+ ctx: The pipeline context dictionary to use for this pipeline execution.
88+ This should be a mutable dictionary-like object that transformers
89+ can use to share state and communicate.
7290
7391 Returns:
7492 The pipeline instance for method chaining.
93+
94+ Note:
95+ Changes made to the context during pipeline execution will be
96+ automatically synchronized back to the original context object
97+ when the pipeline is destroyed or processing completes.
7598 """
7699 # Store reference to the original context
77100 self ._original_context_ref = ctx
@@ -96,13 +119,21 @@ def transform[U](self, t: Callable[[Transformer[T, T]], Transformer[T, U]]) -> "
96119 """Apply a transformation using a lambda function.
97120
98121 Creates a Transformer under the hood and applies it to the pipeline.
99- This is a shorthand method for simple transformations.
122+ This is a shorthand method for simple transformations that allows
123+ chaining transformer operations in a functional style.
100124
101125 Args:
102126 t: A callable that takes a transformer and returns a transformed transformer.
127+ Typically used with lambda expressions like:
128+ `lambda t: t.map(func).filter(predicate)`
103129
104130 Returns:
105- A new Pipeline with the transformed data.
131+ A new Pipeline with the transformed data type.
132+
133+ Example:
134+ >>> pipeline = Pipeline([1, 2, 3, 4, 5])
135+ >>> result = pipeline.transform(lambda t: t.filter(lambda x: x % 2 == 0).map(lambda x: x * 2))
136+ >>> result.to_list() # [4, 8]
106137 """
107138 # Create a new transformer and apply the transformation function
108139 transformer = t (Transformer [T , T ]())
@@ -125,17 +156,28 @@ def apply[U](
125156 ) -> "Pipeline[U]" :
126157 """Apply a transformer to the current data source.
127158
128- The pipeline's managed context is passed down to the transformer.
159+ This method accepts various types of transformers and applies them to
160+ the pipeline data. The pipeline's managed context is automatically
161+ passed to context-aware transformers.
129162
130163 Args:
131- transformer: Either a Transformer instance or a callable function
132- that processes the data.
164+ transformer: One of the following:
165+ - A Transformer instance (preferred for complex operations)
166+ - A callable function that takes an iterable and returns an iterator
167+ - A context-aware callable that takes an iterable and context
133168
134169 Returns:
135- A new Pipeline with the transformed data.
170+ The same Pipeline instance with transformed data (for method chaining) .
136171
137172 Raises:
138173 TypeError: If the transformer is not a supported type.
174+
175+ Example:
176+ >>> pipeline = Pipeline([1, 2, 3])
177+ >>> # Using a Transformer instance
178+ >>> pipeline.apply(createTransformer(int).map(lambda x: x * 2))
179+ >>> # Using a simple function
180+ >>> pipeline.apply(lambda data: (x * 2 for x in data))
139181 """
140182 match transformer :
141183 case Transformer ():
@@ -157,7 +199,34 @@ def branch(
157199 max_batch_buffer : int = 1 ,
158200 use_queue_chunks : bool = True ,
159201 ) -> dict [str , list [Any ]]:
160- """Forks the pipeline into multiple branches for concurrent, parallel processing."""
202+ """Forks the pipeline into multiple branches for concurrent, parallel processing.
203+
204+ This is a **terminal operation** that implements a fan-out pattern where
205+ the entire dataset is copied to each branch for independent processing.
206+ Each branch processes the complete dataset concurrently using separate
207+ transformers, and results are collected and returned in a dictionary.
208+
209+ Args:
210+ branches: A dictionary where keys are branch names (str) and values
211+ are `Transformer` instances of any subtype.
212+ batch_size: The number of items to batch together when sending data
213+ to branches. Larger batches can improve throughput but
214+ use more memory. Defaults to 1000.
215+ max_batch_buffer: The maximum number of batches to buffer for each
216+ branch queue. Controls memory usage and creates
217+ backpressure. Defaults to 1.
218+ use_queue_chunks: Whether to use passthrough chunking for the
219+ transformers. When True, batches are processed
220+ as chunks. Defaults to True.
221+
222+ Returns:
223+ A dictionary where keys are the branch names and values are lists
224+ of all items processed by that branch's transformer.
225+
226+ Note:
227+ This operation consumes the pipeline's iterator, making subsequent
228+ operations on the same pipeline return empty results.
229+ """
161230 if not branches :
162231 self .consume ()
163232 return {}
@@ -258,48 +327,84 @@ def _producer() -> None:
258327 def __iter__ (self ) -> Iterator [T ]:
259328 """Allow the pipeline to be iterated over.
260329
330+ This makes the Pipeline compatible with Python's iterator protocol,
331+ allowing it to be used in for loops, list comprehensions, and other
332+ contexts that expect an iterable.
333+
261334 Returns:
262335 An iterator over the processed data.
336+
337+ Note:
338+ This operation consumes the pipeline's iterator, making subsequent
339+ operations on the same pipeline return empty results.
263340 """
264341 yield from self .processed_data
265342
266343 def to_list (self ) -> list [T ]:
267344 """Execute the pipeline and return the results as a list.
268345
346+ This is a terminal operation that consumes the pipeline's iterator
347+ and materializes all results into memory.
348+
269349 Returns:
270350 A list containing all processed items from the pipeline.
351+
352+ Note:
353+ This operation consumes the pipeline's iterator, making subsequent
354+ operations on the same pipeline return empty results.
271355 """
272356 return list (self .processed_data )
273357
274358 def each (self , function : PipelineFunction [T ]) -> None :
275359 """Apply a function to each element (terminal operation).
276360
361+ This is a terminal operation that processes each element for side effects
362+ and consumes the pipeline's iterator without returning results.
363+
277364 Args:
278- function: The function to apply to each element.
365+ function: The function to apply to each element. Should be used for
366+ side effects like logging, updating external state, etc.
367+
368+ Note:
369+ This operation consumes the pipeline's iterator, making subsequent
370+ operations on the same pipeline return empty results.
279371 """
280372 for item in self .processed_data :
281373 function (item )
282374
283375 def first (self , n : int = 1 ) -> list [T ]:
284376 """Get the first n elements of the pipeline (terminal operation).
285377
378+ This is a terminal operation that consumes up to n elements from the
379+ pipeline's iterator and returns them as a list.
380+
286381 Args:
287- n: The number of elements to retrieve.
382+ n: The number of elements to retrieve. Must be at least 1.
288383
289384 Returns:
290- A list containing the first n elements.
385+ A list containing the first n elements, or fewer if the pipeline
386+ contains fewer than n elements.
291387
292388 Raises:
293389 AssertionError: If n is less than 1.
390+
391+ Note:
392+ This operation partially consumes the pipeline's iterator. Subsequent
393+ operations will continue from where this operation left off.
294394 """
295395 assert n >= 1 , "n must be at least 1"
296396 return list (itertools .islice (self .processed_data , n ))
297397
298398 def consume (self ) -> None :
299- """Consume the pipeline without returning results.
399+ """Consume the pipeline without returning results (terminal operation).
400+
401+ This is a terminal operation that processes all elements in the pipeline
402+ for their side effects without materializing any results. Useful when
403+ the pipeline operations have side effects and you don't need the results.
300404
301- This is useful when you want to execute the pipeline for side effects
302- without collecting the results.
405+ Note:
406+ This operation consumes the pipeline's iterator, making subsequent
407+ operations on the same pipeline return empty results.
303408 """
304409 for _ in self .processed_data :
305410 pass
0 commit comments