Skip to content

Commit cafd088

Browse files
baffellipre-commit-ci[bot]despadamdespadamyakutovicha
authored
Multithreading in Python (#171)
* Added first draft on concurrency. * Added section on multiprocessing module. * Added new material on performance. * Adding new material on threads. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Added exercise. * Moved document. * Moved again. * Added section on threads, added skeleton for tests. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Added an example for threading. * Fixed test skeleton. * Trying to run tests for async code. * Added missing material. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Added matherial on async. * Added material on Locks and Queues. * Removed unused cells. * Removed unused cells. * Improved tests of solutions. * Improved tests. * fix notebook json syntax error at exercise 1 * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Moved loading of testsuite. * Added TOC. * Added quiz. * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update tutorial/tests/test_threads.py Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Update threads.ipynb Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Fixes to the text. * Improvements in cell output. * Fixed test arguments. * Improved woring of exercise. * Improved test comments and methods. * Update tutorial/threads.py Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> * Add threads notebook to the index. * Cosmetic changes. * Apply suggestions from code review Applied sugestions on text Co-authored-by: Edoardo Baldi <edoardo.baldi@empa.ch> * Make `threading` notebook examples work. (#175) * Make notebook work. * Added warning on different modules. --------- Co-authored-by: Simone Baffelli <simone.baffelli@empa.ch> * Added missing example on threading. (#176) --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: despadam <despina.adamopoulou@empa.ch> Co-authored-by: Despina Adamopoulou <16343312+despadam@users.noreply.github.com> Co-authored-by: Aliaksandr Yakutovich <yakutovicha@gmail.com> Co-authored-by: Edoardo Baldi <edoardo.baldi@empa.ch>
1 parent b1cea87 commit cafd088

7 files changed

Lines changed: 1343 additions & 8 deletions

File tree

binder/environment.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ dependencies:
1717
- markdown
1818
- pre-commit
1919
- attrs
20+
- multiprocess
168 KB
Loading

images/process_performance.png

17.6 KB
Loading

index.ipynb

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,9 @@
1919
"\n",
2020
"- [Manage Python project](./manage_python_project.ipynb)\n",
2121
"- [Advanced functions](./functions_advanced.ipynb)\n",
22-
"- [Advanced Object-oriented programming](./object_oriented_programming_advanced.ipynb)\n"
22+
"- [Advanced Object-oriented programming](./object_oriented_programming_advanced.ipynb)\n",
23+
"- [Parallelism and concurrency in Python](./threads.ipynb)\n"
2324
]
24-
},
25-
{
26-
"cell_type": "code",
27-
"execution_count": null,
28-
"metadata": {},
29-
"outputs": [],
30-
"source": []
3125
}
3226
],
3327
"metadata": {

threads.ipynb

Lines changed: 1123 additions & 0 deletions
Large diffs are not rendered by default.

tutorial/tests/test_threads.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
import asyncio
2+
import functools
3+
import pathlib
4+
import random
5+
import string
6+
from collections import Counter
7+
from concurrent.futures import ProcessPoolExecutor
8+
from typing import Awaitable, Callable, Dict
9+
10+
import multiprocess
11+
import pytest
12+
13+
14+
class SecretServer:
15+
def __init__(self, key: str, timeout: int = 0.01):
16+
self.key = key
17+
self.inner_key = "/" + key
18+
self.timeout = timeout
19+
self.sequence = 0
20+
self.reset_flag = False
21+
# Count how many concurrent requests are being made
22+
self.resetter: asyncio.Task = None
23+
24+
async def start(self):
25+
self.resetter = asyncio.create_task(self.reset_sequence())
26+
27+
async def reset_sequence(self):
28+
while True:
29+
await asyncio.sleep(self.timeout)
30+
self.reset_flag = True
31+
32+
async def get_value(self):
33+
# Increase the concurrency counter
34+
if self.reset_flag:
35+
self.sequence = 0
36+
self.reset_flag = False
37+
return "/"
38+
await asyncio.sleep(self.timeout / len(self.inner_key) * 1.5)
39+
seq = self.sequence
40+
# Increase the sequence counter
41+
self.sequence = (self.sequence + 1) % len(self.inner_key)
42+
return self.inner_key[seq]
43+
44+
async def check_key(self, key: str):
45+
return key == self.key
46+
47+
48+
@pytest.fixture(scope="session")
49+
def make_random_file(tmp_path_factory: pytest.TempPathFactory) -> str:
50+
def inner_file(size: int = 1000):
51+
file = tmp_path_factory.mktemp("data").joinpath("file.txt")
52+
with open(file, "w") as f:
53+
f.write("".join(random.choices(string.ascii_letters, k=size)))
54+
return file
55+
56+
return inner_file
57+
58+
59+
def read_segment(file: pathlib.Path, start: int, end: int) -> str:
60+
with open(file) as f:
61+
f.seek(start)
62+
return f.read(end - start)
63+
64+
65+
def segment_stat(segment: str) -> Dict[str, int]:
66+
return Counter(segment.strip())
67+
68+
69+
def count_words(
70+
file: pathlib.Path, size: int, n_processes: int, index: int
71+
) -> Dict[str, int]:
72+
segment_size = size // n_processes
73+
start = index * segment_size
74+
end = start + segment_size
75+
return segment_stat(read_segment(file, start, end))
76+
77+
78+
def reference_exercise1(input_path: pathlib.Path, size: int) -> Dict[str, int]:
79+
workers = multiprocess.cpu_count()
80+
with ProcessPoolExecutor(workers) as executor:
81+
result = executor.map(
82+
functools.partial(count_words, input_path, size, workers), range(workers)
83+
)
84+
return dict(functools.reduce(lambda x, y: x + y, result, Counter()))
85+
86+
87+
@pytest.mark.parametrize("size", [1000, 10000, 100000])
88+
def test_exercise1_total_counts(
89+
function_to_test: Callable,
90+
make_random_file: Callable[[None], pathlib.Path],
91+
size: int,
92+
):
93+
rf = make_random_file(size)
94+
reference_res = reference_exercise1(rf, size)
95+
total_letters = sum(reference_res.values())
96+
user_res = function_to_test(rf, size)
97+
total_letters_user = sum(user_res.values())
98+
assert total_letters == total_letters_user
99+
100+
101+
@pytest.mark.parametrize("size", [1000, 10000, 100000])
102+
def test_exercise1_counts(
103+
function_to_test: Callable,
104+
make_random_file: Callable[[None], pathlib.Path],
105+
size: int,
106+
):
107+
rf = make_random_file(size)
108+
reference_res = reference_exercise1(rf, size)
109+
user_res = function_to_test(rf, size)
110+
assert user_res == reference_res
111+
112+
113+
# #TODO: find a way to test that the user is using multiprocessing (directly or indirectly)
114+
# def test_exercise1_processes(function_to_test: Callable, make_random_file: Callable[[None], pathlib.Path], monkeypatch: pytest.MonkeyPatch):
115+
# with patch.object(multiprocessing.Process, "start") as process_mock:
116+
# size = 1000
117+
# rf = make_random_file(size)
118+
# user_res = function_to_test(rf, size)
119+
# assert process_mock.mock_calls or
120+
121+
122+
def find_word(letters: list[str], separator: str) -> bool:
123+
"""
124+
This function finds a word in a list of letters separated by a separator.
125+
"""
126+
return [w for w in "".join(letters).split(separator) if len(w) > 0]
127+
128+
129+
async def reference_exercise2(server: SecretServer) -> str:
130+
rng = 50
131+
# Concurrently get 30 letters from the server
132+
letters = await asyncio.gather(*[server.get_value() for _ in range(rng)])
133+
134+
# Function to concurrently check if the key is valid
135+
async def check_key(key: str):
136+
valid = await server.check_key(key)
137+
return valid, key
138+
139+
res = await asyncio.gather(*[check_key(key) for key in find_word(letters, "/")])
140+
# Return the first valid key
141+
return [key for valid, key in res if valid][0]
142+
143+
144+
@pytest.mark.parametrize("secret_key", ["Secret", "Very secret", "Extremely secret"])
145+
def test_exercise2(function_to_test: Callable[[None], Awaitable[str]], secret_key: str):
146+
server = SecretServer(secret_key, timeout=1)
147+
148+
async def run_test() -> str:
149+
await server.start()
150+
res = await function_to_test(server)
151+
return res
152+
153+
res = asyncio.run(run_test())
154+
print(res, secret_key)
155+
assert secret_key == res

tutorial/threads.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import os
2+
from concurrent.futures import ProcessPoolExecutor
3+
from time import sleep
4+
5+
from .common import Question, Quiz
6+
7+
8+
class Threads(Quiz):
9+
def __init__(self, title="Decide if the following are parallel or not"):
10+
q1 = Question(
11+
question="One cashier serves two lines of people in a store",
12+
options={
13+
"Parallel": "What if the cashier is slow?",
14+
"Not parallel": "Correct, there's only one cashier",
15+
},
16+
correct_answer="Not parallel",
17+
shuffle=True,
18+
)
19+
20+
q2 = Question(
21+
question="A swimming pool offers multiple shower stalls",
22+
options={
23+
"Parallel": "Correct!",
24+
"Not parallel": "We have more than one shower",
25+
},
26+
correct_answer="Parallel",
27+
shuffle=True,
28+
)
29+
30+
q3 = Question(
31+
question="Multiple people take turns drinking from a cup",
32+
options={
33+
"Parallel": "Why are they sharing a cup?",
34+
"Not parallel": "Correct!",
35+
},
36+
correct_answer="Not parallel",
37+
shuffle=True,
38+
)
39+
40+
super().__init__(questions=[q1, q2, q3])
41+
42+
43+
def work(n: int, show: bool = False) -> int:
44+
"""This function waits a small time and returns the number"""
45+
pid = os.getpid()
46+
if show:
47+
print(f"{pid} Working on {n}\n")
48+
sleep(0.001)
49+
return n
50+
51+
52+
def parallel_work(executor: ProcessPoolExecutor, n: int, batch_size=5) -> int:
53+
"""Wrapper function to run the `work` function in parallel and compute the sum of their results"""
54+
res = executor.map(work, range(n), chunksize=batch_size)
55+
return sum(res)
56+
57+
58+
def sequential_work(n: int) -> int:
59+
"""
60+
This function computes the sum of the results of the `work` function sequentially
61+
"""
62+
return sum([work(i) for i in range(n)])

0 commit comments

Comments
 (0)