|
8 | 8 | # Installation: |
9 | 9 | # pip3 install --upgrade opencage asyncio aiohttp backoff tqdm |
10 | 10 |
|
11 | | -import sys, random, time |
| 11 | +import sys |
12 | 12 | import csv |
13 | | -import backoff |
14 | 13 | import asyncio |
15 | 14 | import traceback |
16 | | -from opencage.geocoder import OpenCageGeocode, AioHttpError |
| 15 | +import backoff |
17 | 16 | from tqdm import tqdm |
| 17 | +from opencage.geocoder import OpenCageGeocode, AioHttpError |
18 | 18 |
|
19 | | -api_key = '' |
20 | | -infile = 'file_to_geocode.csv' |
21 | | -outfile = 'file_geocoded.csv' |
| 19 | +API_KEY = '' |
| 20 | +FILENAME_INPUT_CSV = 'file_to_geocode.csv' |
| 21 | +FILENAME_OUTPUT_CSV = 'file_geocoded.csv' |
22 | 22 |
|
23 | | -max_items = 100 # How man lines to read from the input file. Set to 0 for unlimited |
24 | | -num_workers = 3 # For 10 requests per second try 2-5 |
25 | | -timeout = 5 # For individual HTTP requests. In seconds, default is 1 |
26 | | -retry_max_tries = 10 # How often to retry if a HTTP request times out |
27 | | -retry_max_time = 60 # Limit in seconds for retries |
28 | | -show_progress = True # Show progress bar |
| 23 | +MAX_ITEMS = 100 # Howy man lines to read from the input file. Set to 0 for unlimited |
| 24 | +NUM_WORKERS = 3 # For 10 requests per second try 2-5 |
| 25 | +REQUEST_TIMEOUT_SECONDS = 5 # For individual HTTP requests. Fefault is 1 |
| 26 | +RETRY_MAX_TRIES = 10 # How often to retry if a HTTP request times out |
| 27 | +RETRY_MAX_TIME = 60 # Limit in seconds for retries |
| 28 | +SHOW_PROGRESS = True # Show progress bar |
29 | 29 |
|
30 | | -csv_writer = csv.writer(open(outfile, 'w', newline='')) |
| 30 | +csv_writer = csv.writer(open(FILENAME_OUTPUT_CSV, 'w', encoding='utf8', newline='')) |
31 | 31 |
|
32 | | -progress_bar = show_progress and tqdm(total=0, position=0, desc="Addresses geocoded", dynamic_ncols=True) |
| 32 | +PROGRESS_BAR = SHOW_PROGRESS and tqdm(total=0, position=0, desc="Addresses geocoded", dynamic_ncols=True) |
33 | 33 |
|
34 | 34 | async def write_one_geocoding_result(geocoding_results, address, address_id): |
35 | | - if geocoding_results != None and len(geocoding_results): |
36 | | - first_result = geocoding_results[0] |
37 | | - row = [ |
38 | | - address_id, |
39 | | - first_result['geometry']['lat'], |
40 | | - first_result['geometry']['lng'], |
41 | | - # Any of the components might be empty: |
42 | | - first_result['components'].get('_type', ''), |
43 | | - first_result['components'].get('country', ''), |
44 | | - first_result['components'].get('county', ''), |
45 | | - first_result['components'].get('city', ''), |
46 | | - first_result['components'].get('postcode', ''), |
47 | | - first_result['components'].get('road', ''), |
48 | | - first_result['components'].get('house_number', ''), |
49 | | - first_result['confidence'], |
50 | | - first_result['formatted'] |
51 | | - ] |
52 | | - |
53 | | - else: |
54 | | - row = [ |
55 | | - address_id, |
56 | | - 0, # not to be confused with https://en.wikipedia.org/wiki/Null_Island |
57 | | - 0, |
58 | | - '', |
59 | | - '', |
60 | | - '', |
61 | | - '', |
62 | | - '', |
63 | | - '', |
64 | | - '', |
65 | | - -1, # confidence values are 1-10 (lowest to highest), use -1 for unknown |
66 | | - '' |
67 | | - ] |
68 | | - sys.stderr.write("not found, writing empty result: %s\n" % address) |
69 | | - csv_writer.writerow(row) |
| 35 | + if geocoding_results is not None and len(geocoding_results): |
| 36 | + first_result = geocoding_results[0] |
| 37 | + row = [ |
| 38 | + address_id, |
| 39 | + first_result['geometry']['lat'], |
| 40 | + first_result['geometry']['lng'], |
| 41 | + # Any of the components might be empty: |
| 42 | + first_result['components'].get('_type', ''), |
| 43 | + first_result['components'].get('country', ''), |
| 44 | + first_result['components'].get('county', ''), |
| 45 | + first_result['components'].get('city', ''), |
| 46 | + first_result['components'].get('postcode', ''), |
| 47 | + first_result['components'].get('road', ''), |
| 48 | + first_result['components'].get('house_number', ''), |
| 49 | + first_result['confidence'], |
| 50 | + first_result['formatted'] |
| 51 | + ] |
| 52 | + |
| 53 | + else: |
| 54 | + row = [ |
| 55 | + address_id, |
| 56 | + 0, # not to be confused with https://en.wikipedia.org/wiki/Null_Island |
| 57 | + 0, |
| 58 | + '', |
| 59 | + '', |
| 60 | + '', |
| 61 | + '', |
| 62 | + '', |
| 63 | + '', |
| 64 | + '', |
| 65 | + -1, # confidence values are 1-10 (lowest to highest), use -1 for unknown |
| 66 | + '' |
| 67 | + ] |
| 68 | + sys.stderr.write(f"not found, writing empty result: {address}\n") |
| 69 | + csv_writer.writerow(row) |
70 | 70 |
|
71 | 71 |
|
72 | 72 | # Backing off 0.4 seconds afters 1 tries calling function <function geocode_one_address |
73 | 73 | # at 0x10dbf5e50> with args ('14464 3RD ST # 4, 91423, CA, USA', '1780245') and kwargs {} |
74 | 74 | def backoff_hdlr(details): |
75 | 75 | sys.stderr.write("Backing off {wait:0.1f} seconds afters {tries} tries " |
76 | | - "calling function {target} with args {args} and kwargs " |
77 | | - "{kwargs}\n".format(**details)) |
| 76 | + "calling function {target} with args {args} and kwargs " |
| 77 | + "{kwargs}\n".format(**details)) |
78 | 78 |
|
79 | 79 | # https://pypi.org/project/backoff/ |
80 | 80 | @backoff.on_exception(backoff.expo, |
81 | | - (asyncio.TimeoutError), |
82 | | - max_time=retry_max_time, # seconds |
83 | | - max_tries=retry_max_tries, |
84 | | - on_backoff=backoff_hdlr) |
| 81 | + (asyncio.TimeoutError), |
| 82 | + max_time=RETRY_MAX_TIME, # seconds |
| 83 | + max_tries=RETRY_MAX_TRIES, |
| 84 | + on_backoff=backoff_hdlr) |
85 | 85 | async def geocode_one_address(address, address_id): |
86 | | - async with OpenCageGeocode(api_key) as geocoder: |
87 | | - # address -> coordinates |
88 | | - # note: you may also want to set other optional parameters like |
89 | | - # countrycode, language, etc |
90 | | - # see the full list: https://opencagedata.com/api#forward-opt |
91 | | - try: |
92 | | - geocoding_results = await geocoder.geocode_async(address, no_annotations=1) |
93 | | - except Exception as e: |
94 | | - geocoding_results = None |
95 | | - traceback.print_exception(e, file=sys.stderr) |
96 | | - |
97 | | - # coordinates -> address, e.g. '40.78,-73.97' => 101, West 91st Street, New York |
98 | | - # lon_lat = address.split(',') |
99 | | - # geocoding_result = await geocoder.reverse_geocode_async(lon_lat[0], lon_lat[1], no_annotations=1) |
100 | | - # returns a single result so we convert it to a list |
101 | | - # geocoding_results = [geocoding_result] |
102 | | - |
103 | | - try: |
104 | | - await write_one_geocoding_result(geocoding_results, address, address_id) |
105 | | - except Exception as e: |
106 | | - traceback.print_exception(e, file=sys.stderr) |
| 86 | + async with OpenCageGeocode(API_KEY) as geocoder: |
| 87 | + # address -> coordinates |
| 88 | + # note: you may also want to set other optional parameters like |
| 89 | + # countrycode, language, etc |
| 90 | + # see the full list: https://opencagedata.com/api#forward-opt |
| 91 | + try: |
| 92 | + geocoding_results = await geocoder.geocode_async(address, no_annotations=1) |
| 93 | + except Exception as exc: |
| 94 | + geocoding_results = None |
| 95 | + traceback.print_exception(exc, file=sys.stderr) |
| 96 | + |
| 97 | + # coordinates -> address, e.g. '40.78,-73.97' => 101, West 91st Street, New York |
| 98 | + # lon_lat = address.split(',') |
| 99 | + # geocoding_result = await geocoder.reverse_geocode_async(lon_lat[0], lon_lat[1], no_annotations=1) |
| 100 | + # returns a single result so we convert it to a list |
| 101 | + # geocoding_results = [geocoding_result] |
| 102 | + |
| 103 | + try: |
| 104 | + await write_one_geocoding_result(geocoding_results, address, address_id) |
| 105 | + except Exception as exc: |
| 106 | + traceback.print_exception(exc, file=sys.stderr) |
107 | 107 |
|
108 | 108 |
|
109 | 109 |
|
110 | 110 | async def run_worker(worker_name, queue): |
111 | | - global progress_bar |
112 | | - sys.stderr.write("Worker %s starts...\n" % worker_name) |
| 111 | + global PROGRESS_BAR |
| 112 | + sys.stderr.write(f"Worker {worker_name} starts...\n") |
113 | 113 |
|
114 | | - while True: |
115 | | - work_item = await queue.get() |
116 | | - address_id = work_item['id'] |
117 | | - address = work_item['address'] |
118 | | - await geocode_one_address(address, address_id) |
| 114 | + while True: |
| 115 | + work_item = await queue.get() |
| 116 | + address_id = work_item['id'] |
| 117 | + address = work_item['address'] |
| 118 | + await geocode_one_address(address, address_id) |
119 | 119 |
|
120 | | - if show_progress: |
121 | | - progress_bar.update(1) |
| 120 | + if SHOW_PROGRESS: |
| 121 | + PROGRESS_BAR.update(1) |
122 | 122 |
|
123 | | - queue.task_done() |
| 123 | + queue.task_done() |
124 | 124 |
|
125 | 125 |
|
126 | 126 |
|
127 | 127 |
|
128 | 128 | async def main(): |
129 | | - global progress_bar |
130 | | - assert sys.version_info >= (3, 7), "Script requires Python 3.7+." |
131 | | - |
132 | | - ## 1. Read CSV into a Queue |
133 | | - ## Each work_item is an address and id. The id will be part of the output, |
134 | | - ## easy to add more settings. Named 'work_item' to avoid the words |
135 | | - ## 'address' or 'task' which are used elsewhere |
136 | | - ## |
137 | | - ## https://docs.python.org/3/library/asyncio-queue.html |
138 | | - ## |
139 | | - queue = asyncio.Queue(maxsize=max_items) |
140 | | - |
141 | | - csv_reader = csv.reader(open(infile, 'r'), strict=True, skipinitialspace=True) |
142 | | - |
143 | | - for row in csv_reader: |
144 | | - if len(row) == 0: |
145 | | - raise Exception("Empty line in input file at line number %d, aborting" % csv_reader.line_num) |
146 | | - |
147 | | - work_item = {'id': row[0], 'address': row[1]} |
148 | | - await queue.put(work_item) |
149 | | - if queue.full(): |
150 | | - break |
151 | | - |
152 | | - sys.stderr.write("%d work_items in queue\n" % queue.qsize()) |
153 | | - |
154 | | - if show_progress: |
155 | | - progress_bar.total = queue.qsize() |
156 | | - progress_bar.refresh() |
157 | | - |
158 | | - ## 2. Create tasks workers. That is coroutines, each taks take work_items |
159 | | - ## from the queue until it's empty. Tasks run in parallel |
160 | | - ## |
161 | | - ## https://docs.python.org/3/library/asyncio-task.html#creating-tasks |
162 | | - ## https://docs.python.org/3/library/asyncio-task.html#coroutine |
163 | | - ## |
164 | | - sys.stderr.write("Creating %d task workers...\n" % num_workers) |
165 | | - tasks = [] |
166 | | - for i in range(num_workers): |
167 | | - task = asyncio.create_task(run_worker(f'worker {i}', queue)) |
168 | | - tasks.append(task) |
169 | | - |
170 | | - |
171 | | - ## 3. Now workers do the geocoding |
172 | | - ## |
173 | | - sys.stderr.write("Now waiting for workers to finish processing queue...\n") |
174 | | - await queue.join() |
175 | | - |
176 | | - |
177 | | - ## 4. Cleanup |
178 | | - ## |
179 | | - for task in tasks: |
180 | | - task.cancel() |
181 | | - |
182 | | - if show_progress: |
183 | | - progress_bar.close() |
184 | | - |
185 | | - sys.stderr.write("All done.\n") |
| 129 | + global PROGRESS_BAR |
| 130 | + assert sys.version_info >= (3, 7), "Script requires Python 3.7+." |
| 131 | + |
| 132 | + ## 1. Read CSV into a Queue |
| 133 | + ## Each work_item is an address and id. The id will be part of the output, |
| 134 | + ## easy to add more settings. Named 'work_item' to avoid the words |
| 135 | + ## 'address' or 'task' which are used elsewhere |
| 136 | + ## |
| 137 | + ## https://docs.python.org/3/library/asyncio-queue.html |
| 138 | + ## |
| 139 | + queue = asyncio.Queue(maxsize=MAX_ITEMS) |
| 140 | + |
| 141 | + csv_reader = csv.reader(open(FILENAME_INPUT_CSV, 'r'), strict=True, skipinitialspace=True) |
| 142 | + |
| 143 | + for row in csv_reader: |
| 144 | + if len(row) == 0: |
| 145 | + raise Exception(f"Empty line in input file at line number {csv_reader.line_num}, aborting") |
| 146 | + |
| 147 | + work_item = {'id': row[0], 'address': row[1]} |
| 148 | + await queue.put(work_item) |
| 149 | + if queue.full(): |
| 150 | + break |
| 151 | + |
| 152 | + sys.stderr.write(f"{queue.qsize()} work_items in queue\n") |
| 153 | + |
| 154 | + if SHOW_PROGRESS: |
| 155 | + PROGRESS_BAR.total = queue.qsize() |
| 156 | + PROGRESS_BAR.refresh() |
| 157 | + |
| 158 | + ## 2. Create tasks workers. That is coroutines, each taks take work_items |
| 159 | + ## from the queue until it's empty. Tasks run in parallel |
| 160 | + ## |
| 161 | + ## https://docs.python.org/3/library/asyncio-task.html#creating-tasks |
| 162 | + ## https://docs.python.org/3/library/asyncio-task.html#coroutine |
| 163 | + ## |
| 164 | + sys.stderr.write(f"Creating {NUM_WORKERS} task workers...\n") |
| 165 | + tasks = [] |
| 166 | + for i in range(NUM_WORKERS): |
| 167 | + task = asyncio.create_task(run_worker(f'worker {i}', queue)) |
| 168 | + tasks.append(task) |
| 169 | + |
| 170 | + |
| 171 | + ## 3. Now workers do the geocoding |
| 172 | + ## |
| 173 | + sys.stderr.write("Now waiting for workers to finish processing queue...\n") |
| 174 | + await queue.join() |
| 175 | + |
| 176 | + |
| 177 | + ## 4. Cleanup |
| 178 | + ## |
| 179 | + for task in tasks: |
| 180 | + task.cancel() |
| 181 | + |
| 182 | + if SHOW_PROGRESS: |
| 183 | + PROGRESS_BAR.close() |
| 184 | + |
| 185 | + sys.stderr.write("All done.\n") |
186 | 186 |
|
187 | 187 |
|
188 | 188 | asyncio.run(main()) |
0 commit comments