Skip to content

Commit 41201cf

Browse files
committed
Upgrade lib-postgresql
1 parent e10b494 commit 41201cf

3 files changed

Lines changed: 172 additions & 1 deletion

File tree

.dockerignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
.git
2+
*.md
3+
dist/
4+
!README.md

Dockerfile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
FROM flowdocker/postgresql:0.0.48
1+
FROM flowdocker/postgresql:0.0.60
22

33
ADD . /opt/schema
44
WORKDIR /opt/schema
55

66
RUN echo "set -x #echo on" >> /opt/run.sh
77
RUN echo "service postgresql start" >> /opt/run.sh
88
RUN echo "sh /opt/schema/install.sh" >> /opt/run.sh
9+
RUN echo "service postgresql stop" >> /opt/run.sh
910

1011
RUN sh /opt/run.sh
1112

scripts/20160823-144517.sql

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
create or replace function journal.refresh_journaling(
2+
p_source_schema_name in varchar, p_source_table_name in varchar,
3+
p_target_schema_name in varchar, p_target_table_name in varchar
4+
) returns varchar language plpgsql as $$
5+
declare
6+
row record;
7+
v_journal_name text;
8+
v_data_type character varying;
9+
begin
10+
v_journal_name = p_target_schema_name || '.' || p_target_table_name;
11+
if exists(select 1 from information_schema.tables where table_schema = p_target_schema_name and table_name = p_target_table_name) then
12+
for row in (select column_name, journal.get_data_type_string(information_schema.columns.*) as data_type from information_schema.columns where table_schema = p_source_schema_name and table_name = p_source_table_name order by ordinal_position) loop
13+
14+
-- NB: Specifically choosing to not drop deleted columns from the journal table, to preserve the data.
15+
-- There are no constraints (other than not null on primary key columns) on the journaling table
16+
-- columns anyway, so leaving it populated with null will be fine.
17+
select journal.get_data_type_string(information_schema.columns.*) into v_data_type from information_schema.columns where table_schema = p_target_schema_name and table_name = p_target_table_name and column_name = row.column_name;
18+
if not found then
19+
execute 'alter table ' || v_journal_name || ' add ' || row.column_name || ' ' || row.data_type;
20+
elsif (row.data_type != v_data_type) then
21+
execute 'alter table ' || v_journal_name || ' alter column ' || row.column_name || ' type ' || row.data_type;
22+
end if;
23+
24+
end loop;
25+
else
26+
execute 'create table ' || v_journal_name || ' as select * from ' || p_source_schema_name || '.' || p_source_table_name || ' limit 0';
27+
execute 'alter table ' || v_journal_name || ' add journal_timestamp timestamp with time zone not null default now() ';
28+
execute 'alter table ' || v_journal_name || ' add journal_operation text not null ';
29+
execute 'alter table ' || v_journal_name || ' add journal_id bigserial primary key ';
30+
execute 'alter table ' || v_journal_name || ' set (fillfactor=100) ';
31+
execute 'comment on table ' || v_journal_name || ' is ''Created by plsql function refresh_journaling to shadow all inserts and updates on the table ' || p_source_schema_name || '.' || p_source_table_name || '''';
32+
perform journal.add_primary_key_data(p_source_schema_name, p_source_table_name, p_target_schema_name, p_target_table_name);
33+
perform journal.create_prevent_update_trigger(p_target_schema_name, p_target_table_name);
34+
perform journal.create_prevent_delete_trigger(p_target_schema_name, p_target_table_name);
35+
end if;
36+
37+
perform journal.refresh_journal_trigger(p_source_schema_name, p_source_table_name, p_target_schema_name, p_target_table_name);
38+
39+
return v_journal_name;
40+
41+
end;
42+
$$;
43+
44+
create or replace function journal.create_prevent_update_trigger(p_schema_name character varying, p_table_name character varying) returns character varying
45+
language plpgsql
46+
as $$
47+
declare
48+
v_name varchar;
49+
begin
50+
v_name = p_table_name || '_prevent_update_trigger';
51+
execute 'create trigger ' || v_name || ' before update on ' || p_schema_name || '.' || p_table_name || ' for each row execute procedure journal.prevent_update()';
52+
return v_name;
53+
end;
54+
$$;
55+
56+
57+
create schema queue;
58+
set search_path to queue;
59+
60+
CREATE OR REPLACE FUNCTION create_queue(
61+
p_schema_name text,
62+
p_table_name text,
63+
p_queue_schema_name text DEFAULT 'queue',
64+
p_queue_table_name text DEFAULT null
65+
) RETURNS text
66+
LANGUAGE plpgsql
67+
AS $$
68+
declare
69+
v_queue_table_name text;
70+
v_source_name text;
71+
v_procedure_name text;
72+
v_trigger_name text;
73+
v_sql text;
74+
begin
75+
v_queue_table_name = p_queue_schema_name || '.' || coalesce(p_queue_table_name, p_table_name);
76+
v_source_name = p_schema_name || '.' || p_table_name;
77+
78+
v_sql = 'create table ' || v_queue_table_name || '(journal_id bigint primary key, created_at timestamptz default now() not null, processed_at timestamptz, error text)';
79+
execute v_sql;
80+
81+
v_sql = 'create index on ' || v_queue_table_name || '(journal_id) where processed_at is null';
82+
execute v_sql;
83+
84+
85+
perform partman.create_parent(v_queue_table_name, 'created_at', 'time', 'daily');
86+
update partman.part_config
87+
set retention = '1 week',
88+
retention_keep_table = false,
89+
retention_keep_index = false
90+
where parent_table in (v_queue_table_name);
91+
92+
v_procedure_name = p_queue_schema_name || '.' || p_table_name || '_queue_insert';
93+
v_trigger_name = p_table_name || '_queue_insert_trigger';
94+
95+
v_sql = 'create or replace function ' || v_procedure_name || '() returns trigger language plpgsql as ''';
96+
v_sql := v_sql || ' begin ';
97+
v_sql := v_sql || ' insert into ' || v_queue_table_name || ' (journal_id) values (new.journal_id);';
98+
v_sql := v_sql || ' return new; end; ''';
99+
execute v_sql;
100+
101+
-- create the trigger
102+
v_sql = 'drop trigger if exists ' || v_trigger_name || ' on ' || v_source_name || '; ' ||
103+
'create trigger ' || v_trigger_name || ' after insert on ' || v_source_name ||
104+
' for each row execute procedure ' || v_procedure_name || '()';
105+
106+
execute v_sql;
107+
108+
109+
v_procedure_name = p_queue_schema_name || '.' || p_table_name || '_queue_update';
110+
v_trigger_name = p_table_name || '_queue_update_trigger';
111+
112+
v_sql = 'create or replace function ' || v_procedure_name || '() returns trigger language plpgsql as ''';
113+
v_sql := v_sql || ' begin ';
114+
v_sql := v_sql || ' if new.journal_id != old.journal_id then ';
115+
v_sql := v_sql || ' raise ''''The table ' || v_source_name || ' has a queue table - updated to journal_id are not supported'''';';
116+
v_sql := v_sql || ' end if;';
117+
v_sql := v_sql || ' insert into ' || v_queue_table_name || ' (journal_id) values (new.journal_id);';
118+
v_sql := v_sql || ' return new; end; ''';
119+
execute v_sql;
120+
121+
-- create the trigger
122+
v_sql = 'drop trigger if exists ' || v_trigger_name || ' on ' || v_source_name || '; ' ||
123+
'create trigger ' || v_trigger_name || ' after update on ' || v_source_name ||
124+
' for each row execute procedure ' || v_procedure_name || '()';
125+
126+
execute v_sql;
127+
128+
129+
v_procedure_name = p_queue_schema_name || '.' || p_table_name || '_queue_delete';
130+
v_trigger_name = p_table_name || '_queue_delete_trigger';
131+
132+
v_sql = 'create or replace function ' || v_procedure_name || '() returns trigger language plpgsql as ''';
133+
v_sql := v_sql || ' begin ';
134+
v_sql := v_sql || ' insert into ' || v_queue_table_name || ' (journal_id) values (old.journal_id);';
135+
v_sql := v_sql || ' return old; end; ''';
136+
execute v_sql;
137+
138+
-- create the trigger
139+
v_sql = 'drop trigger if exists ' || v_trigger_name || ' on ' || v_source_name || '; ' ||
140+
'create trigger ' || v_trigger_name || ' after delete on ' || v_source_name ||
141+
' for each row execute procedure ' || v_procedure_name || '()';
142+
143+
execute v_sql;
144+
145+
return v_queue_table_name;
146+
end;
147+
$$;
148+
149+
150+
151+
create or replace function upgrade_journal() returns integer language plpgsql as $$
152+
declare
153+
row record;
154+
count integer = 0;
155+
begin
156+
for row in (select table_name from information_schema.tables where table_schema='journal') loop
157+
execute 'ALTER TABLE journal.' || row.table_name || ' SET ( FILLFACTOR = 100 )';
158+
count = count + 1;
159+
end loop;
160+
return count;
161+
end;
162+
$$;
163+
164+
select upgrade_journal();
165+
166+
drop function upgrade_journal();

0 commit comments

Comments
 (0)