Skip to content

Commit 68b7001

Browse files
authored
Upgrade journal library - fix quoting bug (#7)
1 parent 530464d commit 68b7001

1 file changed

Lines changed: 281 additions & 0 deletions

File tree

scripts/20181029-115650.sql

Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
create or replace function journal.quote_column(name in varchar) returns varchar language plpgsql as $$
2+
begin
3+
return '"' || name || '"';
4+
end;
5+
$$;
6+
7+
create or replace function journal.refresh_journal_trigger(
8+
p_source_schema_name in varchar,
9+
p_source_table_name in varchar,
10+
p_target_schema_name in varchar = 'journal',
11+
p_target_table_name in varchar = null
12+
) returns varchar language plpgsql as $$
13+
declare
14+
v_insert_trigger_name text;
15+
v_delete_trigger_name text;
16+
begin
17+
v_insert_trigger_name := journal.refresh_journal_insert_trigger(p_source_schema_name, p_source_table_name, p_target_schema_name, coalesce(p_target_table_name, p_source_table_name));
18+
v_delete_trigger_name := journal.refresh_journal_delete_trigger(p_source_schema_name, p_source_table_name, p_target_schema_name, coalesce(p_target_table_name, p_source_table_name));
19+
20+
return v_insert_trigger_name || ' ' || v_delete_trigger_name;
21+
end;
22+
$$;
23+
24+
create or replace function journal.refresh_journal_delete_trigger(
25+
p_source_schema_name in varchar, p_source_table_name in varchar,
26+
p_target_schema_name in varchar, p_target_table_name in varchar
27+
) returns varchar language plpgsql as $$
28+
declare
29+
row record;
30+
v_journal_name text;
31+
v_source_name text;
32+
v_trigger_name text;
33+
v_sql text;
34+
v_target_sql text;
35+
begin
36+
v_journal_name = p_target_schema_name || '.' || p_target_table_name;
37+
v_source_name = p_source_schema_name || '.' || p_source_table_name;
38+
v_trigger_name = p_target_table_name || '_journal_delete_trigger';
39+
-- create the function
40+
v_sql = 'create or replace function ' || v_journal_name || '_delete() returns trigger language plpgsql as ''';
41+
v_sql := v_sql || ' begin ';
42+
v_sql := v_sql || ' insert into ' || v_journal_name || ' (journal_operation';
43+
v_target_sql = 'TG_OP';
44+
45+
for row in (select column_name from information_schema.columns where table_schema = p_source_schema_name and table_name = p_source_table_name order by ordinal_position) loop
46+
v_sql := v_sql || ', ' || journal.quote_column(row.column_name);
47+
v_target_sql := v_target_sql || ', old.' || journal.quote_column(row.column_name);
48+
end loop;
49+
50+
v_sql := v_sql || ') values (' || v_target_sql || '); ';
51+
v_sql := v_sql || ' return null; end; ''';
52+
53+
execute v_sql;
54+
55+
-- create the trigger
56+
v_sql = 'drop trigger if exists ' || v_trigger_name || ' on ' || v_source_name || '; ' ||
57+
'create trigger ' || v_trigger_name || ' after delete on ' || v_source_name ||
58+
' for each row execute procedure ' || v_journal_name || '_delete()';
59+
60+
execute v_sql;
61+
62+
return v_trigger_name;
63+
64+
end;
65+
$$;
66+
67+
create or replace function journal.refresh_journal_insert_trigger(
68+
p_source_schema_name in varchar, p_source_table_name in varchar,
69+
p_target_schema_name in varchar, p_target_table_name in varchar
70+
) returns varchar language plpgsql as $$
71+
declare
72+
row record;
73+
v_journal_name text;
74+
v_source_name text;
75+
v_trigger_name text;
76+
v_first boolean;
77+
v_sql text;
78+
v_target_sql text;
79+
v_name text;
80+
begin
81+
v_journal_name = p_target_schema_name || '.' || p_target_table_name;
82+
v_source_name = p_source_schema_name || '.' || p_source_table_name;
83+
v_trigger_name = p_target_table_name || '_journal_insert_trigger';
84+
-- create the function
85+
v_sql = 'create or replace function ' || v_journal_name || '_insert() returns trigger language plpgsql as ''';
86+
v_sql := v_sql || ' begin ';
87+
88+
for v_name in (select * from journal.primary_key_columns(p_source_schema_name, p_source_table_name)) loop
89+
v_sql := v_sql || ' if (TG_OP=''''UPDATE'''' and (old.' || v_name || ' != new.' || v_name || ')) then';
90+
v_sql := v_sql || ' raise exception ''''Table[' || v_source_name || '] is journaled. Updates to primary key column[' || v_name || '] are not supported as this would make it impossible to follow the history of this row in the journal table[' || v_journal_name || ']'''';';
91+
v_sql := v_sql || ' end if;';
92+
end loop;
93+
94+
v_sql := v_sql || ' insert into ' || v_journal_name || ' (journal_operation';
95+
v_target_sql = 'TG_OP';
96+
97+
for row in (select column_name from information_schema.columns where table_schema = p_source_schema_name and table_name = p_source_table_name order by ordinal_position) loop
98+
v_sql := v_sql || ', ' || journal.quote_column(row.column_name);
99+
v_target_sql := v_target_sql || ', new.' || journal.quote_column(row.column_name);
100+
end loop;
101+
102+
v_sql := v_sql || ') values (' || v_target_sql || '); ';
103+
v_sql := v_sql || ' return null; end; ''';
104+
105+
execute v_sql;
106+
107+
-- create the trigger
108+
v_sql = 'drop trigger if exists ' || v_trigger_name || ' on ' || v_source_name || '; ' ||
109+
'create trigger ' || v_trigger_name || ' after insert or update on ' || v_source_name ||
110+
' for each row execute procedure ' || v_journal_name || '_insert()';
111+
112+
execute v_sql;
113+
114+
return v_trigger_name;
115+
116+
end;
117+
$$;
118+
119+
create or replace function journal.get_data_type_string(
120+
p_column information_schema.columns
121+
) returns varchar language plpgsql as $$
122+
begin
123+
return case p_column.data_type
124+
when 'character' then 'text'
125+
when 'character varying' then 'text'
126+
when '"char"' then 'text'
127+
else p_column.data_type
128+
end;
129+
end;
130+
$$;
131+
132+
create or replace function journal.primary_key_columns(
133+
p_schema_name in varchar,
134+
p_table_name in varchar
135+
) returns setof text language plpgsql AS $$
136+
declare
137+
row record;
138+
begin
139+
for row in (
140+
select key_column_usage.column_name
141+
from information_schema.table_constraints
142+
join information_schema.key_column_usage
143+
on key_column_usage.table_name = table_constraints.table_name
144+
and key_column_usage.table_schema = table_constraints.table_schema
145+
and key_column_usage.constraint_name = table_constraints.constraint_name
146+
where table_constraints.constraint_type = 'PRIMARY KEY'
147+
and table_constraints.table_schema = p_schema_name
148+
and table_constraints.table_name = p_table_name
149+
order by coalesce(key_column_usage.position_in_unique_constraint, 0),
150+
coalesce(key_column_usage.ordinal_position, 0),
151+
key_column_usage.column_name
152+
) loop
153+
return next row.column_name;
154+
end loop;
155+
end;
156+
$$;
157+
158+
159+
create or replace function journal.add_primary_key_data(
160+
p_source_schema_name in varchar, p_source_table_name in varchar,
161+
p_target_schema_name in varchar, p_target_table_name in varchar
162+
) returns void language plpgsql as $$
163+
declare
164+
v_name text;
165+
v_columns character varying := '';
166+
begin
167+
for v_name in (select * from journal.primary_key_columns(p_source_schema_name, p_source_table_name)) loop
168+
if v_columns != '' then
169+
v_columns := v_columns || ', ';
170+
end if;
171+
v_columns := v_columns || v_name;
172+
execute 'alter table ' || p_target_schema_name || '.' || p_target_table_name || ' alter column ' || v_name || ' set not null';
173+
end loop;
174+
175+
if v_columns != '' then
176+
execute 'create index on ' || p_target_schema_name || '.' || p_target_table_name || '(' || v_columns || ')';
177+
end if;
178+
179+
end;
180+
$$;
181+
182+
create or replace function journal.refresh_journaling(
183+
p_source_schema_name in varchar, p_source_table_name in varchar,
184+
p_target_schema_name in varchar, p_target_table_name in varchar
185+
) returns varchar language plpgsql as $$
186+
declare
187+
row record;
188+
v_journal_name text;
189+
v_data_type character varying;
190+
begin
191+
v_journal_name = p_target_schema_name || '.' || p_target_table_name;
192+
if exists(select 1 from information_schema.tables where table_schema = p_target_schema_name and table_name = p_target_table_name) then
193+
for row in (select column_name, journal.get_data_type_string(information_schema.columns.*) as data_type from information_schema.columns where table_schema = p_source_schema_name and table_name = p_source_table_name order by ordinal_position) loop
194+
195+
-- NB: Specifically choosing to not drop deleted columns from the journal table, to preserve the data.
196+
-- There are no constraints (other than not null on primary key columns) on the journaling table
197+
-- columns anyway, so leaving it populated with null will be fine.
198+
select journal.get_data_type_string(information_schema.columns.*) into v_data_type from information_schema.columns where table_schema = p_target_schema_name and table_name = p_target_table_name and column_name = row.column_name;
199+
if not found then
200+
execute 'alter table ' || v_journal_name || ' add ' || journal.quote_column(row.column_name) || ' ' || row.data_type;
201+
elsif (row.data_type != v_data_type) then
202+
execute 'alter table ' || v_journal_name || ' alter column ' || journal.quote_column(row.column_name) || ' type ' || row.data_type;
203+
end if;
204+
205+
end loop;
206+
else
207+
execute 'create table ' || v_journal_name || ' as select * from ' || p_source_schema_name || '.' || p_source_table_name || ' limit 0';
208+
execute 'alter table ' || v_journal_name || ' add journal_timestamp timestamp with time zone not null default now() ';
209+
execute 'alter table ' || v_journal_name || ' add journal_operation text not null ';
210+
execute 'alter table ' || v_journal_name || ' add journal_id bigserial primary key ';
211+
execute 'comment on table ' || v_journal_name || ' is ''Created by plsql function refresh_journaling to shadow all inserts and updates on the table ' || p_source_schema_name || '.' || p_source_table_name || '''';
212+
perform journal.add_primary_key_data(p_source_schema_name, p_source_table_name, p_target_schema_name, p_target_table_name);
213+
end if;
214+
215+
perform journal.refresh_journal_trigger(p_source_schema_name, p_source_table_name, p_target_schema_name, p_target_table_name);
216+
217+
return v_journal_name;
218+
219+
end;
220+
$$;
221+
222+
--note: creating event_triggers requires superuser privileges
223+
create or replace function journal.create_event_trigger(
224+
p_source_schema_name in varchar, p_source_table_name in varchar,
225+
p_target_schema_name in varchar, p_target_table_name in varchar
226+
) returns void language plpgsql as $$
227+
declare
228+
v_journal_name text;
229+
v_source_name text;
230+
v_function_sql text;
231+
v_trigger_sql text;
232+
begin
233+
v_journal_name = p_target_schema_name || '.' || p_target_table_name;
234+
v_source_name = p_source_schema_name || '.' || p_source_table_name;
235+
if exists(select 1 from information_schema.tables where table_schema = p_target_schema_name and table_name = p_target_table_name) then
236+
v_function_sql = 'CREATE OR REPLACE FUNCTION refresh_' || p_source_table_name || '_journal() RETURNS event_trigger AS ''';
237+
v_function_sql := v_function_sql || ' declare ';
238+
v_function_sql := v_function_sql || ' r RECORD; ';
239+
v_function_sql := v_function_sql || ' func_exists boolean; ';
240+
v_function_sql := v_function_sql || 'BEGIN ';
241+
--for postgres 9.4 compatability, so we don't throw errors
242+
v_function_sql := v_function_sql || 'select exists(select * from pg_proc where proname = ''pg_event_trigger_ddl_commands'') into func_exists; ';
243+
v_function_sql := v_function_sql || 'IF func_exists THEN ';
244+
v_function_sql := v_function_sql || ' FOR r IN SELECT * FROM pg_event_trigger_ddl_commands() LOOP ';
245+
v_function_sql := v_function_sql || ' IF r.object_identity = ''' || v_source_name || ''' THEN ';
246+
v_function_sql := v_function_sql || ' perform journal.refresh_journaling(''' || p_source_schema_name || ''', ''' || p_source_table_name || ''', ''' || p_target_schema_name || ''', ''' || p_target_table_name ||'''); ';
247+
v_function_sql := v_function_sql || ' END IF; ';
248+
v_function_sql := v_function_sql || ' END LOOP; ';
249+
v_function_sql := v_function_sql || 'END IF; ';
250+
v_function_sql := v_function_sql || 'END; ';
251+
v_function_sql := v_function_sql || '''';
252+
v_function_sql := v_function_sql || 'LANGUAGE plpgsql;';
253+
254+
v_trigger_sql = 'CREATE EVENT TRIGGER tr_refresh_' || p_source_table_name || 'journal ';
255+
v_trigger_sql := v_trigger_sql || 'ON ddl_command_end WHEN TAG IN (''ALTER TABLE'') ';
256+
v_trigger_sql := v_trigger_sql || 'EXECUTE PROCEDURE refresh_' || p_source_table_name || '_journal(); ';
257+
258+
perform v_function_sql;
259+
perform v_trigger_sql;
260+
261+
else
262+
raise exception 'Unable to create journal event trigger for table without journaling. Use journal.create_journaling instead';
263+
end if;
264+
end;
265+
$$;
266+
267+
268+
--note this requires superuser privileges
269+
create or replace function journal.create_journaling(
270+
p_source_schema_name in varchar, p_source_table_name in varchar,
271+
p_target_schema_name in varchar, p_target_table_name in varchar
272+
) returns varchar language plpgsql as $$
273+
declare
274+
v_journal_name varchar;
275+
begin
276+
select journal.refresh_journaling(p_source_schema_name, p_source_table_name, p_target_schema_name, p_target_table_name) into v_journal_name;
277+
perform journal.create_event_trigger(p_source_schema_name, p_source_table_name, p_target_schema_name, p_target_table_name);
278+
return v_journal_name;
279+
end;
280+
$$;
281+

0 commit comments

Comments
 (0)