Compare commits

..

21 Commits

Author SHA1 Message Date
Javier Goizueta
4f7b07f922 Merge pull request #363 from CartoDB/fix-sync-schema
Fix cartodb schema references
2019-07-01 16:15:25 +02:00
Javier Goizueta
69cc56b589 Remove schema from public function examples 2019-07-01 15:13:48 +02:00
Javier Goizueta
1028b24333 Fix cartodb schema references 2019-07-01 14:48:42 +02:00
Javier Goizueta
4d1c4f6a22 Release 0.28.0 2019-07-01 10:56:06 +02:00
Javier Goizueta
83ab128a01 Merge pull request #355 from CartoDB/table-syncer
[WIP] Table syncer
2019-07-01 10:49:06 +02:00
Javier Goizueta
b5f36902c5 Merge branch 'master' into table-syncer 2019-06-28 13:50:46 +02:00
Rafa de la Torre
42dc03d77b Qualify functions with extension schema
This avoids some issues with search_path and scripting black magic.
2019-05-30 16:56:12 +02:00
Rafa de la Torre
9254723719 Make format expression more readable
Make an EXECUTE format('', param1, ..., paramN) more readable by
adding the index of the argument to print (`position`) in the format
specifier.
2019-05-29 13:13:30 +02:00
Rafa de la Torre
26ad966ab6 Add some timing info 2019-05-28 16:40:01 +02:00
Rafa de la Torre
a2723a3c90 Exclude certain columns from sync if instructed to do so
For the Geocoding (and in general for LDS use cases) it may come in
handy to exclude geometry columns from the list of stuff to
syncrhonize. Otherwise they may be lost, overwritten with NULL values.
2019-05-28 16:11:56 +02:00
Rafa de la Torre
2f8ea7e4ea Avoid tables name clashing when executing within same transaction
Generate more unique temp table names when the CDB_SyncTable function
is executed multiple times within the same transaction.

When executed in isolation, there will be always an implicit
surrounding transaction.

But when executed several times within the same transaction it can
give an `ERROR:  relation "src_sync_718794" already exists`.

E.g:

```
BEGIN;
SELECT cartodb.CDB_SyncTable('source1', 'public', 'dest1');
SELECT cartodb.CDB_SyncTable('source12, 'public', 'dest2');
COMMIT;
```
2019-05-28 15:39:02 +02:00
Rafa de la Torre
45c21d7f42 Make other tests resilient to CDB_SyncTable test failures
Use a transaction to avoid leaving stuff after our tests, that affect
Quota and User Tables tests.
2019-05-28 15:33:55 +02:00
Rafa de la Torre
f442c21fa4 Fix the test/CDB_AnalysisCheckTest.sql
These tests are failing in PG11:

```
*** /home/travis/build/CartoDB/cartodb-postgresql/expected/test/CDB_AnalysisCheckTest.out	2019-05-27 16:09:45.063543994 +0000
--- /home/travis/build/CartoDB/cartodb-postgresql/results/test/CDB_AnalysisCheckTest.out	2019-05-27 16:12:39.770847666 +0000
***************
*** 5,12 ****
  CREATE TABLE
  CREATE TABLE
  CREATE TABLE
- (analysis_2f13a3dbd7_41bd92976fc6dd97072afe4ee450054f4c0715d5,public,analysis_2f13a3dbd7_41bd92976fc6dd97072afe4ee450054f4c0715d5)
  (analysis_2f13a3dbd7_f00cee44e9e6152b450bde3a92eb9ae0d099da94,public,analysis_2f13a3dbd7_f00cee44e9e6152b450bde3a92eb9ae0d099da94)
  0

  1
--- 5,12 ----
  CREATE TABLE
  CREATE TABLE
  CREATE TABLE
  (analysis_2f13a3dbd7_f00cee44e9e6152b450bde3a92eb9ae0d099da94,public,analysis_2f13a3dbd7_f00cee44e9e6152b450bde3a92eb9ae0d099da94)
+ (analysis_2f13a3dbd7_41bd92976fc6dd97072afe4ee450054f4c0715d5,public,analysis_2f13a3dbd7_41bd92976fc6dd97072afe4ee450054f4c0715d5)
  0

  1
```

The reason for that is that they rely on row ordering that cannot be
guaranteed as per SQL Standard. Forcing that assumed ordering is
enough to get it working again.
2019-05-28 09:34:10 +02:00
Rafa de la Torre
ee9d08a2be Some functional E2E tests for the CDB_SyncTable() function 2019-05-27 17:56:36 +02:00
Rafa de la Torre
7606585672 Perf optimization: use EXCEPT instead of NOT IN
With javitonino's help, greatly reduce the processing time by using
EXCEPT instead of NOT IN, which causes it to use a `HashSetOp Except`
plan on the subqueries rather than a `Seq Scan` on `Materialize`'d
subtables.
2019-05-27 15:20:06 +02:00
Rafa de la Torre
81d0f338cf Create HASH indices on the temp tables 2019-05-27 15:18:13 +02:00
Rafa de la Torre
951f257654 Simplify code by using ON COMMIT DROP
Simplify code by relying on automatic removal of temp tables with ON
COMMIT DROP, so that we avoid the messy EXCEPTION management.
2019-05-27 12:02:12 +02:00
Rafa de la Torre
f461faf0b6 Simplify udpate: use * instead of column list 2019-05-27 11:56:41 +02:00
Rafa de la Torre
a8d57abda6 Update changed rows 2019-05-27 11:54:38 +02:00
Rafa de la Torre
982ddfdeff Helper function to generate UPDATE SET clause 2019-05-27 11:31:07 +02:00
Rafa de la Torre
da4331ac78 WIP: A function to sync tables
It assumes there's a cartodb_id column in both source and target. It
does not perform unnecessary actions. It respects augmented columns in
target table, if they exist. It is meant to be efficient.
2019-05-24 18:38:28 +02:00
7 changed files with 305 additions and 2 deletions

View File

@@ -1,7 +1,7 @@
# cartodb/Makefile
EXTENSION = cartodb
EXTVERSION = 0.27.2
EXTVERSION = 0.28.0
SED = sed
AWK = awk
@@ -99,6 +99,7 @@ UPGRADABLE = \
0.27.0 \
0.27.1 \
0.27.2 \
0.28.0 \
$(EXTVERSION)dev \
$(EXTVERSION)next \
$(END)

View File

@@ -1,3 +1,6 @@
0.28.0 (2019-07-01)
* New function CDB_SyncTable (#355)
0.27.2 (2019-06-21)
* Improvements and fixes in Ghost tables functions (#360)

View File

@@ -0,0 +1,179 @@
/*
Gets the column names of a given table.
Sample usage:
SELECT @extschema@._CDB_GetColumns('public.films');
*/
CREATE OR REPLACE FUNCTION @extschema@._CDB_GetColumns(src_table REGCLASS)
RETURNS SETOF NAME
AS $$
SELECT
a.attname as "colname"
FROM
pg_catalog.pg_attribute a
WHERE
a.attnum > 0
AND NOT a.attisdropped
AND a.attrelid = (
SELECT c.oid
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.oid = src_table::oid
AND pg_catalog.pg_table_is_visible(c.oid)
);
$$ LANGUAGE sql STABLE PARALLEL UNSAFE;
/*
Given an array of quoted column names, it generates an UPDATE SET
clause with the following form:
the_geom = changed.the_geom,
id = changed.id,
elevation = changed.elevation
Example of usage:
SELECT @extschema@.__CDB_GetUpdateSetClause('{the_geom, id, elevation}', 'changed');
*/
CREATE OR REPLACE FUNCTION @extschema@.__CDB_GetUpdateSetClause(colnames TEXT[], update_source TEXT)
RETURNS TEXT
AS $$
DECLARE
set_clause_list TEXT[];
col TEXT;
BEGIN
FOREACH col IN ARRAY colnames
LOOP
set_clause_list := array_append(set_clause_list, format('%1$s = %2$s.%1$s', col, update_source));
END lOOP;
RETURN array_to_string(set_clause_list, ', ');
END;
$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;
/*
Given a prefix, generate a safe unique NAME for a temp table.
Example of usage:
SELECT @extschema@.__CDB_GenerateUniqueName('src_sync'); --> src_sync_718794_120106
*/
CREATE OR REPLACE FUNCTION @extschema@.__CDB_GenerateUniqueName(prefix TEXT)
RETURNS NAME
AS $$
SELECT format('%s_%s_%s', prefix, txid_current(), (random()*1000000)::int)::NAME;
$$ LANGUAGE sql VOLATILE PARALLEL UNSAFE;
/*
A Table Syncer
Assumptions:
- Both tables contain a consistent cartodb_id column
- Destination table has all columns of the source or does not exist
Sample usage:
SELECT CDB_SyncTable('radar_stations', 'public', 'syncdest');
SELECT CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest', '{the_geom, the_geom_webmercator}');
*/
CREATE OR REPLACE FUNCTION @extschema@.CDB_SyncTable(src_table REGCLASS, dst_schema REGNAMESPACE, dst_table NAME, skip_cols NAME[] = '{}')
RETURNS void
AS $$
DECLARE
fq_dest_table TEXT;
colnames TEXT[];
quoted_colnames TEXT;
src_hash_table_name NAME;
dst_hash_table_name NAME;
update_set_clause TEXT;
num_rows BIGINT;
err_context text;
t timestamptz;
BEGIN
-- If the destination table does not exist, just copy the source table
fq_dest_table := format('%I.%I', dst_schema, dst_table);
EXECUTE format('CREATE TABLE IF NOT EXISTS %s as TABLE %I', fq_dest_table, src_table);
GET DIAGNOSTICS num_rows = ROW_COUNT;
IF num_rows > 0 THEN
RAISE NOTICE 'INSERTED % row(s)', num_rows;
RETURN;
END IF;
skip_cols := skip_cols || '{cartodb_id}';
-- Get the list of columns from the source table, excluding skip_cols
SELECT ARRAY(SELECT quote_ident(c) FROM @extschema@._CDB_GetColumns(src_table) as c EXCEPT SELECT unnest(skip_cols)) INTO colnames;
quoted_colnames := array_to_string(colnames, ',');
src_hash_table_name := @extschema@.__CDB_GenerateUniqueName('src_sync');
dst_hash_table_name := @extschema@.__CDB_GenerateUniqueName('dst_sync');
EXECUTE format('CREATE TEMP TABLE %I(cartodb_id BIGINT, hash TEXT) ON COMMIT DROP', src_hash_table_name);
EXECUTE format('CREATE TEMP TABLE %I(cartodb_id BIGINT, hash TEXT) ON COMMIT DROP', dst_hash_table_name);
-- Compute hash tables for src_table and dst_table h[cartodb_id] = hash(row)
-- It'll take the form of a temp table with an index (easy to run set operations)
t := clock_timestamp();
EXECUTE format('INSERT INTO %I SELECT cartodb_id, md5(ROW(%s)::text) hash FROM %I', src_hash_table_name, quoted_colnames, src_table);
EXECUTE format('INSERT INTO %I SELECT cartodb_id, md5(ROW(%s)::text) hash FROM %s', dst_hash_table_name, quoted_colnames, fq_dest_table);
RAISE DEBUG 'Populate hash tables time (s): %', clock_timestamp() - t;
-- Create indexes
-- We use hash indexes as they are fit for id comparison.
t := clock_timestamp();
EXECUTE format('CREATE INDEX ON %I USING HASH (cartodb_id)', src_hash_table_name);
EXECUTE format('CREATE INDEX ON %I USING HASH (cartodb_id)', dst_hash_table_name);
RAISE DEBUG 'Index creation on hash tables time (s): %', clock_timestamp() - t;
-- Deal with deleted rows: ids in dest but not in source
t := clock_timestamp();
EXECUTE format(
'DELETE FROM %s WHERE cartodb_id IN (SELECT cartodb_id FROM %I EXCEPT SELECT cartodb_id FROM %I)',
fq_dest_table,
dst_hash_table_name,
src_hash_table_name);
GET DIAGNOSTICS num_rows = ROW_COUNT;
RAISE NOTICE 'DELETED % row(s)', num_rows;
RAISE DEBUG 'DELETE time (s): %', clock_timestamp() - t;
-- Deal with inserted rows: ids in source but not in dest
t := clock_timestamp();
EXECUTE format('
INSERT INTO %1$s (cartodb_id,%2$s)
SELECT h.cartodb_id,%2$s FROM (SELECT cartodb_id FROM %3$I EXCEPT SELECT cartodb_id FROM %4$I) h
LEFT JOIN %5$I s ON s.cartodb_id = h.cartodb_id;
', fq_dest_table, quoted_colnames, src_hash_table_name, dst_hash_table_name, src_table);
GET DIAGNOSTICS num_rows = ROW_COUNT;
RAISE NOTICE 'INSERTED % row(s)', num_rows;
RAISE DEBUG 'INSERT time (s): %', clock_timestamp() - t;
-- Deal with modified rows: ids in source and dest but different hashes
t := clock_timestamp();
update_set_clause := @extschema@.__CDB_GetUpdateSetClause(colnames, 'changed');
EXECUTE format('
UPDATE %1$s dst SET %2$s
FROM (
SELECT *
FROM %3$s src
WHERE cartodb_id IN
(SELECT sh.cartodb_id FROM %4$I sh
LEFT JOIN %5$I dh ON sh.cartodb_id = dh.cartodb_id
WHERE sh.hash <> dh.hash)
) changed
WHERE dst.cartodb_id = changed.cartodb_id;
', fq_dest_table, update_set_clause, src_table, src_hash_table_name, dst_hash_table_name);
GET DIAGNOSTICS num_rows = ROW_COUNT;
RAISE NOTICE 'MODIFIED % row(s)', num_rows;
RAISE DEBUG 'UPDATE time (s): %', clock_timestamp() - t;
END;
$$ LANGUAGE plpgsql VOLATILE PARALLEL UNSAFE;

View File

@@ -0,0 +1 @@
../scripts-available/CDB_SyncTable.sql

View File

@@ -7,7 +7,7 @@ SELECT _CDB_AnalysisDataSize('public');
CREATE TABLE analysis_2f13a3dbd7_41bd92976fc6dd97072afe4ee450054f4c0715d5(id int);
CREATE TABLE analysis_2f13a3dbd7_f00cee44e9e6152b450bde3a92eb9ae0d099da94(id int);
CREATE TABLE analysis_2f13a3dbd7_f00cee44e9e6152b450bde3a92eb9ae0d099da9(id int);
SELECT _CDB_AnalysisTablesInSchema('public');
SELECT _CDB_AnalysisTablesInSchema('public') t ORDER BY t;
SELECT _CDB_AnalysisDataSize('public');
SELECT CDB_CheckAnalysisQuota('analysis_2f13a3dbd7_f00cee44e9e6152b450bde3a92eb9ae0d099da94');
SELECT CDB_SetUserQuotaInBytes(1);

View File

@@ -0,0 +1,58 @@
-- Setup: create and populate a table to test the syncs
\set QUIET on
BEGIN;
SET client_min_messages TO error;
CREATE TABLE test_sync_source (
cartodb_id bigint,
lat double precision,
lon double precision,
name text
);
INSERT INTO test_sync_source VALUES
(1, 1.0, 1.0, 'foo'),
(2, 2.0, 2.0, 'bar'),
(3, 3.0, 3.0, 'patata'),
(4, 4.0, 4.0, 'melon');
SET client_min_messages TO notice;
\set QUIET off
\echo 'First table sync: it should be simply just copied to the destination'
SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest');
\echo 'Next table sync: there shall be no changes'
SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest');
\echo 'Remove a row from the source and check it is deleted from the dest table'
DELETE FROM test_sync_source WHERE cartodb_id = 3;
SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest');
\echo 'Insert a new row and check that it is inserted in the dest table'
INSERT INTO test_sync_source VALUES (5, 5.0, 5.0, 'sandia');
SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest');
\echo 'Modify row and check that it is modified in the dest table'
UPDATE test_sync_source SET name = 'cantaloupe' WHERE cartodb_id = 4;
SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest');
\echo 'Sanity check: the end result is the same source table'
SELECT * FROM test_sync_source ORDER BY cartodb_id;
SELECT * FROM test_sync_dest ORDER BY cartodb_id;
\echo 'It shall exclude geom columns if instructed to do so'
\set QUIET on
SET client_min_messages TO error;
SELECT cartodb.CDB_SetUserQuotaInBytes(0); -- Set user quota to infinite
SELECT cartodb.CDB_CartodbfyTable('test_sync_source');
SELECT cartodb.CDB_CartodbfyTable('test_sync_dest');
UPDATE test_sync_dest SET the_geom = cartodb.CDB_LatLng(lat, lon); -- A "gecoding"
\set QUIET off
SET client_min_messages TO notice;
SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest', '{the_geom, the_geom_webmercator}');
SELECT * FROM test_sync_source;
SELECT * FROM test_sync_dest;
-- Cleanup
ROLLBACK;

View File

@@ -0,0 +1,61 @@
First table sync: it should be simply just copied to the destination
NOTICE: INSERTED 4 row(s)
Next table sync: there shall be no changes
NOTICE: relation "test_sync_dest" already exists, skipping
NOTICE: DELETED 0 row(s)
NOTICE: INSERTED 0 row(s)
NOTICE: MODIFIED 0 row(s)
Remove a row from the source and check it is deleted from the dest table
DELETE 1
NOTICE: relation "test_sync_dest" already exists, skipping
NOTICE: DELETED 1 row(s)
NOTICE: INSERTED 0 row(s)
NOTICE: MODIFIED 0 row(s)
Insert a new row and check that it is inserted in the dest table
INSERT 0 1
NOTICE: relation "test_sync_dest" already exists, skipping
NOTICE: DELETED 0 row(s)
NOTICE: INSERTED 1 row(s)
NOTICE: MODIFIED 0 row(s)
Modify row and check that it is modified in the dest table
UPDATE 1
NOTICE: relation "test_sync_dest" already exists, skipping
NOTICE: DELETED 0 row(s)
NOTICE: INSERTED 0 row(s)
NOTICE: MODIFIED 1 row(s)
Sanity check: the end result is the same source table
1|1|1|foo
2|2|2|bar
4|4|4|cantaloupe
5|5|5|sandia
1|1|1|foo
2|2|2|bar
4|4|4|cantaloupe
5|5|5|sandia
It shall exclude geom columns if instructed to do so
0
test_sync_source
test_sync_dest
SET
NOTICE: relation "test_sync_dest" already exists, skipping
NOTICE: cdb_invalidate_varnish(public.test_sync_dest) called
NOTICE: DELETED 0 row(s)
NOTICE: cdb_invalidate_varnish(public.test_sync_dest) called
NOTICE: INSERTED 0 row(s)
NOTICE: cdb_invalidate_varnish(public.test_sync_dest) called
NOTICE: MODIFIED 0 row(s)
1|||1|1|foo
2|||2|2|bar
5|||5|5|sandia
4|||4|4|cantaloupe
1|0101000020E6100000000000000000F03F000000000000F03F|0101000020110F0000DB0B4ADA772DFB402B432E49D22DFB40|1|1|foo
2|0101000020E610000000000000000000400000000000000040|0101000020110F00003C0C4ADA772D0B4177F404ABE12E0B41|2|2|bar
5|0101000020E610000000000000000014400000000000001440|0101000020110F000099476EE86AFC20413E7EB983F2012141|5|5|sandia
4|0101000020E610000000000000000010400000000000001040|0101000020110F00003C0C4ADA772D1B4160AB497020331B41|4|4|cantaloupe
ROLLBACK