Merge pull request #365 from CartoDB/table-syncer-notmps
Sync tables without using temporary hash tables
This commit is contained in:
3
NEWS.md
3
NEWS.md
@@ -1,3 +1,6 @@
|
||||
x.y.z (YYYY-MM-DD)
|
||||
* Avoid temporary tables creation in CDB_SyncTable (#366)
|
||||
|
||||
0.28.0 (2019-07-01)
|
||||
* New function CDB_SyncTable (#355)
|
||||
|
||||
|
||||
@@ -1071,7 +1071,7 @@ BEGIN
|
||||
-- by selecting their names into an array and
|
||||
-- joining the array with a comma
|
||||
SELECT
|
||||
',' || array_to_string(array_agg(Format('%I',a.attname)),',') AS column_name_sql,
|
||||
',' || array_to_string(array_agg(Format('%I',a.attname) ORDER BY a.attnum),',') AS column_name_sql,
|
||||
Count(*) AS count
|
||||
INTO rec
|
||||
FROM pg_class c
|
||||
|
||||
@@ -21,7 +21,8 @@ AS $$
|
||||
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
|
||||
WHERE c.oid = src_table::oid
|
||||
AND pg_catalog.pg_table_is_visible(c.oid)
|
||||
);
|
||||
)
|
||||
ORDER BY a.attnum;
|
||||
$$ LANGUAGE sql STABLE PARALLEL UNSAFE;
|
||||
|
||||
|
||||
@@ -67,6 +68,21 @@ AS $$
|
||||
SELECT format('%s_%s_%s', prefix, txid_current(), (random()*1000000)::int)::NAME;
|
||||
$$ LANGUAGE sql VOLATILE PARALLEL UNSAFE;
|
||||
|
||||
/*
|
||||
Given a table name and an array of column names,
|
||||
return array of column names qualified with the table name and quoted when necessary
|
||||
tablename and colnames should be properly quoted, and for this reason the type NAME is not
|
||||
used for them (with quotes they could exceed the maximum identifier length)
|
||||
|
||||
Example of usage:
|
||||
|
||||
SELECT @extschema@.__CDB_QualifyColumns('t', ARRAY['a','"b-1"']); --> ARRAY['t.a','t."b-1"']
|
||||
|
||||
*/
|
||||
CREATE OR REPLACE FUNCTION @extschema@.__CDB_QualifyColumns(tablename NAME, colnames NAME[]) RETURNS TEXT[] AS
|
||||
$$
|
||||
SELECT array_agg(tablename || '.' || _colname) from unnest(colnames) _colname;
|
||||
$$ LANGUAGE sql IMMUTABLE PARALLEL SAFE;
|
||||
|
||||
/*
|
||||
A Table Syncer
|
||||
@@ -88,10 +104,8 @@ DECLARE
|
||||
fq_dest_table TEXT;
|
||||
|
||||
colnames TEXT[];
|
||||
quoted_colnames TEXT;
|
||||
|
||||
src_hash_table_name NAME;
|
||||
dst_hash_table_name NAME;
|
||||
dst_colnames TEXT;
|
||||
src_colnames TEXT;
|
||||
|
||||
update_set_clause TEXT;
|
||||
|
||||
@@ -101,8 +115,8 @@ DECLARE
|
||||
t timestamptz;
|
||||
BEGIN
|
||||
-- If the destination table does not exist, just copy the source table
|
||||
fq_dest_table := format('%I.%I', dst_schema, dst_table);
|
||||
EXECUTE format('CREATE TABLE IF NOT EXISTS %s as TABLE %I', fq_dest_table, src_table);
|
||||
fq_dest_table := format('%s.%I', dst_schema, dst_table);
|
||||
EXECUTE format('CREATE TABLE IF NOT EXISTS %s as TABLE %s', fq_dest_table, src_table);
|
||||
GET DIAGNOSTICS num_rows = ROW_COUNT;
|
||||
IF num_rows > 0 THEN
|
||||
RAISE NOTICE 'INSERTED % row(s)', num_rows;
|
||||
@@ -113,35 +127,12 @@ BEGIN
|
||||
|
||||
-- Get the list of columns from the source table, excluding skip_cols
|
||||
SELECT ARRAY(SELECT quote_ident(c) FROM @extschema@._CDB_GetColumns(src_table) as c EXCEPT SELECT unnest(skip_cols)) INTO colnames;
|
||||
quoted_colnames := array_to_string(colnames, ',');
|
||||
|
||||
src_hash_table_name := @extschema@.__CDB_GenerateUniqueName('src_sync');
|
||||
dst_hash_table_name := @extschema@.__CDB_GenerateUniqueName('dst_sync');
|
||||
|
||||
EXECUTE format('CREATE TEMP TABLE %I(cartodb_id BIGINT, hash TEXT) ON COMMIT DROP', src_hash_table_name);
|
||||
EXECUTE format('CREATE TEMP TABLE %I(cartodb_id BIGINT, hash TEXT) ON COMMIT DROP', dst_hash_table_name);
|
||||
|
||||
-- Compute hash tables for src_table and dst_table h[cartodb_id] = hash(row)
|
||||
-- It'll take the form of a temp table with an index (easy to run set operations)
|
||||
t := clock_timestamp();
|
||||
EXECUTE format('INSERT INTO %I SELECT cartodb_id, md5(ROW(%s)::text) hash FROM %I', src_hash_table_name, quoted_colnames, src_table);
|
||||
EXECUTE format('INSERT INTO %I SELECT cartodb_id, md5(ROW(%s)::text) hash FROM %s', dst_hash_table_name, quoted_colnames, fq_dest_table);
|
||||
RAISE DEBUG 'Populate hash tables time (s): %', clock_timestamp() - t;
|
||||
|
||||
-- Create indexes
|
||||
-- We use hash indexes as they are fit for id comparison.
|
||||
t := clock_timestamp();
|
||||
EXECUTE format('CREATE INDEX ON %I USING HASH (cartodb_id)', src_hash_table_name);
|
||||
EXECUTE format('CREATE INDEX ON %I USING HASH (cartodb_id)', dst_hash_table_name);
|
||||
RAISE DEBUG 'Index creation on hash tables time (s): %', clock_timestamp() - t;
|
||||
|
||||
-- Deal with deleted rows: ids in dest but not in source
|
||||
t := clock_timestamp();
|
||||
EXECUTE format(
|
||||
'DELETE FROM %s WHERE cartodb_id IN (SELECT cartodb_id FROM %I EXCEPT SELECT cartodb_id FROM %I)',
|
||||
fq_dest_table,
|
||||
dst_hash_table_name,
|
||||
src_hash_table_name);
|
||||
'DELETE FROM %1$s _dst WHERE NOT EXISTS (SELECT * FROM %2$s _src WHERE _src.cartodb_id=_dst.cartodb_id)',
|
||||
fq_dest_table, src_table);
|
||||
GET DIAGNOSTICS num_rows = ROW_COUNT;
|
||||
RAISE NOTICE 'DELETED % row(s)', num_rows;
|
||||
RAISE DEBUG 'DELETE time (s): %', clock_timestamp() - t;
|
||||
@@ -149,29 +140,26 @@ BEGIN
|
||||
-- Deal with inserted rows: ids in source but not in dest
|
||||
t := clock_timestamp();
|
||||
EXECUTE format('
|
||||
INSERT INTO %1$s (cartodb_id,%2$s)
|
||||
SELECT h.cartodb_id,%2$s FROM (SELECT cartodb_id FROM %3$I EXCEPT SELECT cartodb_id FROM %4$I) h
|
||||
LEFT JOIN %5$I s ON s.cartodb_id = h.cartodb_id;
|
||||
', fq_dest_table, quoted_colnames, src_hash_table_name, dst_hash_table_name, src_table);
|
||||
INSERT INTO %1$s(cartodb_id, %2$s)
|
||||
SELECT cartodb_id, %2$s FROM %3$s _src WHERE NOT EXISTS (SELECT * FROM %1$s _dst WHERE _src.cartodb_id=_dst.cartodb_id)
|
||||
', fq_dest_table, array_to_string(colnames, ','), src_table);
|
||||
GET DIAGNOSTICS num_rows = ROW_COUNT;
|
||||
RAISE NOTICE 'INSERTED % row(s)', num_rows;
|
||||
RAISE DEBUG 'INSERT time (s): %', clock_timestamp() - t;
|
||||
|
||||
-- Deal with modified rows: ids in source and dest but different hashes
|
||||
t := clock_timestamp();
|
||||
update_set_clause := @extschema@.__CDB_GetUpdateSetClause(colnames, 'changed');
|
||||
update_set_clause := @extschema@.__CDB_GetUpdateSetClause(colnames, '_changed');
|
||||
dst_colnames := array_to_string(@extschema@.__CDB_QualifyColumns('_dst', colnames), ',');
|
||||
src_colnames := array_to_string(@extschema@.__CDB_QualifyColumns('_src', colnames), ',');
|
||||
EXECUTE format('
|
||||
UPDATE %1$s dst SET %2$s
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM %3$s src
|
||||
WHERE cartodb_id IN
|
||||
(SELECT sh.cartodb_id FROM %4$I sh
|
||||
LEFT JOIN %5$I dh ON sh.cartodb_id = dh.cartodb_id
|
||||
WHERE sh.hash <> dh.hash)
|
||||
) changed
|
||||
WHERE dst.cartodb_id = changed.cartodb_id;
|
||||
', fq_dest_table, update_set_clause, src_table, src_hash_table_name, dst_hash_table_name);
|
||||
UPDATE %1$s _update SET %2$s
|
||||
FROM (
|
||||
SELECT _src.* FROM %3$s _src JOIN %1$s _dst ON (_dst.cartodb_id = _src.cartodb_id)
|
||||
WHERE md5(ROW(%4$s)::text) <> md5(ROW(%5$s)::text)
|
||||
) _changed
|
||||
WHERE _update.cartodb_id = _changed.cartodb_id;
|
||||
', fq_dest_table, update_set_clause, src_table, dst_colnames, src_colnames);
|
||||
GET DIAGNOSTICS num_rows = ROW_COUNT;
|
||||
RAISE NOTICE 'MODIFIED % row(s)', num_rows;
|
||||
RAISE DEBUG 'UPDATE time (s): %', clock_timestamp() - t;
|
||||
|
||||
@@ -50,9 +50,45 @@ UPDATE test_sync_dest SET the_geom = cartodb.CDB_LatLng(lat, lon); -- A "gecodin
|
||||
\set QUIET off
|
||||
SET client_min_messages TO notice;
|
||||
SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest', '{the_geom, the_geom_webmercator}');
|
||||
SELECT * FROM test_sync_source;
|
||||
SELECT * FROM test_sync_dest;
|
||||
SELECT * FROM test_sync_source ORDER BY cartodb_id;
|
||||
SELECT * FROM test_sync_dest ORDER BY cartodb_id;
|
||||
|
||||
\echo 'It will work with schemas that need quoting'
|
||||
\set QUIET on
|
||||
SET client_min_messages TO error;
|
||||
CREATE SCHEMA "sch-ema";
|
||||
CREATE TABLE "test_sync_source2" AS SELECT * FROM test_sync_source;
|
||||
\set QUIET off
|
||||
SELECT cartodb.CDB_SyncTable('test_sync_source2', 'sch-ema', 'test_sync_dest');
|
||||
INSERT INTO test_sync_source2(cartodb_id, lat, lon, name) VALUES (6, 6.0, 6.0, 'papaya');
|
||||
DELETE FROM test_sync_source2 WHERE cartodb_id = 4;
|
||||
UPDATE test_sync_source2 SET lat = 2.5 WHERE cartodb_id = 2;
|
||||
SET client_min_messages TO notice;
|
||||
SELECT cartodb.CDB_SyncTable('test_sync_source2', 'sch-ema', 'test_sync_dest');
|
||||
|
||||
\echo 'It will work with table names that need quoting'
|
||||
\set QUIET on
|
||||
SET client_min_messages TO error;
|
||||
CREATE TABLE "test-sync-source" AS SELECT * FROM test_sync_source;
|
||||
\set QUIET off
|
||||
SELECT cartodb.CDB_SyncTable('test-sync-source', 'public', 'test-sync-dest');
|
||||
INSERT INTO "test-sync-source"(cartodb_id, lat, lon, name) VALUES (6, 6.0, 6.0, 'papaya');
|
||||
DELETE FROM "test-sync-source" WHERE cartodb_id = 4;
|
||||
UPDATE "test-sync-source" SET lat = 2.5 WHERE cartodb_id = 2;
|
||||
SET client_min_messages TO notice;
|
||||
SELECT cartodb.CDB_SyncTable('test-sync-source', 'public', 'test-sync-dest');
|
||||
|
||||
\echo 'It will work with column names that need quoting'
|
||||
\set QUIET on
|
||||
SET client_min_messages TO error;
|
||||
ALTER TABLE test_sync_source ADD COLUMN "a-column" int;
|
||||
\set QUIET off
|
||||
SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest2');
|
||||
INSERT INTO test_sync_source(cartodb_id, lat, lon, name) VALUES (6, 6.0, 6.0, 'papaya');
|
||||
DELETE FROM test_sync_source WHERE cartodb_id = 4;
|
||||
UPDATE test_sync_source SET lat = 2.5 WHERE cartodb_id = 2;
|
||||
SET client_min_messages TO notice;
|
||||
SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest2');
|
||||
|
||||
-- Cleanup
|
||||
ROLLBACK;
|
||||
|
||||
@@ -52,10 +52,43 @@ NOTICE: MODIFIED 0 row(s)
|
||||
|
||||
1|||1|1|foo
|
||||
2|||2|2|bar
|
||||
5|||5|5|sandia
|
||||
4|||4|4|cantaloupe
|
||||
5|||5|5|sandia
|
||||
1|0101000020E6100000000000000000F03F000000000000F03F|0101000020110F0000DB0B4ADA772DFB402B432E49D22DFB40|1|1|foo
|
||||
2|0101000020E610000000000000000000400000000000000040|0101000020110F00003C0C4ADA772D0B4177F404ABE12E0B41|2|2|bar
|
||||
5|0101000020E610000000000000000014400000000000001440|0101000020110F000099476EE86AFC20413E7EB983F2012141|5|5|sandia
|
||||
4|0101000020E610000000000000000010400000000000001040|0101000020110F00003C0C4ADA772D1B4160AB497020331B41|4|4|cantaloupe
|
||||
5|0101000020E610000000000000000014400000000000001440|0101000020110F000099476EE86AFC20413E7EB983F2012141|5|5|sandia
|
||||
It will work with schemas that need quoting
|
||||
|
||||
INSERT 0 1
|
||||
DELETE 1
|
||||
UPDATE 1
|
||||
SET
|
||||
NOTICE: relation "test_sync_dest" already exists, skipping
|
||||
NOTICE: DELETED 1 row(s)
|
||||
NOTICE: INSERTED 1 row(s)
|
||||
NOTICE: MODIFIED 1 row(s)
|
||||
|
||||
It will work with table names that need quoting
|
||||
|
||||
INSERT 0 1
|
||||
DELETE 1
|
||||
UPDATE 1
|
||||
SET
|
||||
NOTICE: relation "test-sync-dest" already exists, skipping
|
||||
NOTICE: DELETED 1 row(s)
|
||||
NOTICE: INSERTED 1 row(s)
|
||||
NOTICE: MODIFIED 1 row(s)
|
||||
|
||||
It will work with column names that need quoting
|
||||
|
||||
INSERT 0 1
|
||||
DELETE 1
|
||||
UPDATE 1
|
||||
SET
|
||||
NOTICE: relation "test_sync_dest2" already exists, skipping
|
||||
NOTICE: DELETED 1 row(s)
|
||||
NOTICE: INSERTED 1 row(s)
|
||||
NOTICE: MODIFIED 1 row(s)
|
||||
|
||||
ROLLBACK
|
||||
|
||||
Reference in New Issue
Block a user