|
72 | 72 | " self.trigger_name_fn = client.QueryBuilder._quote_ident(trigger_name_fn) \n", |
73 | 73 | "\n", |
74 | 74 | "\n", |
75 | | - " def register(self):\n", |
| 75 | + " def register(self): \n", |
76 | 76 | " with psycopg2.connect(self.service_url) as conn:\n", |
77 | 77 | " with conn.cursor() as cursor:\n", |
78 | 78 | " cursor.execute(f\"\"\"\n", |
|
81 | 81 | " table_exists = cursor.fetchone()[0]\n", |
82 | 82 | " if table_exists:\n", |
83 | 83 | " return\n", |
84 | | - " \n", |
85 | | - " with psycopg2.connect(self.service_url) as conn:\n", |
86 | | - " with conn.cursor() as cursor:\n", |
| 84 | + " \n", |
87 | 85 | " cursor.execute(f\"\"\"\n", |
88 | 86 | " CREATE TABLE {self.schema_name}.{self.work_queue_table_name} (\n", |
89 | 87 | " id int\n", |
|
110 | 108 | " INSERT INTO {self.schema_name}.{self.work_queue_table_name} SELECT {self.id_column_name} FROM {self.schema_name}.{self.table_name};\n", |
111 | 109 | " \"\"\")\n", |
112 | 110 | "\n", |
113 | | - " def process(self, embed_and_write_cb, batch_size:int=10, advisory_prefix=47859, autoregister=True):\n", |
| 111 | + " def process(self, embed_and_write_cb, batch_size:int=10, autoregister=True):\n", |
114 | 112 | " if autoregister:\n", |
115 | 113 | " self.register()\n", |
116 | 114 | " \n", |
117 | 115 | " with psycopg2.connect(self.service_url) as conn:\n", |
118 | 116 | " with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:\n", |
119 | 117 | " cursor.execute(f\"\"\"\n", |
| 118 | + " SELECT to_regclass('{self.schema_name}.{self.work_queue_table_name}')::oid; \n", |
| 119 | + " \"\"\")\n", |
| 120 | + " table_oid = cursor.fetchone()[0]\n", |
| 121 | + " \n", |
| 122 | + " cursor.execute(f\"\"\"\n", |
120 | 123 | " WITH selected_rows AS (\n", |
121 | 124 | " SELECT id\n", |
122 | 125 | " FROM {self.schema_name}.{self.work_queue_table_name}\n", |
123 | 126 | " LIMIT {int(batch_size)}\n", |
124 | 127 | " FOR UPDATE SKIP LOCKED\n", |
125 | 128 | " ), \n", |
126 | 129 | " locked_items AS (\n", |
127 | | - " SELECT id, pg_try_advisory_xact_lock({int(advisory_prefix)}, id) AS locked\n", |
| 130 | + " SELECT id, pg_try_advisory_xact_lock({int(table_oid)}, id) AS locked\n", |
128 | 131 | " FROM (SELECT DISTINCT id FROM selected_rows ORDER BY id) as ids\n", |
129 | 132 | " ),\n", |
130 | 133 | " deleted_rows AS (\n", |
|
0 commit comments