Compare commits

...

26 Commits

Author SHA1 Message Date
Rafa de la Torre
4c93258cd2 Prepare release of 0.28.1 2019-07-04 16:09:53 +02:00
Javier Goizueta
bf140890d8 Merge pull request #365 from CartoDB/table-syncer-notmps
Sync tables without using temporary hash tables
2019-07-04 16:05:21 +02:00
Rafa de la Torre
29a31d4c40 Merge pull request #362 from CartoDB/fdw-affected-tables
Make CDB_Get_Foreign_Updated_At robust to missing CDB_TableMetadata
2019-07-04 16:02:46 +02:00
Javier Goizueta
dbd403a2f6 Fix cases of double-quoting identifiers 2019-07-04 12:47:58 +02:00
Javier Goizueta
cb353ec6a8 Add tests for quoted table & column names 2019-07-04 12:47:01 +02:00
Javier Goizueta
2beabfced6 Add test for double schema-quoting 2019-07-03 18:48:49 +02:00
Javier Goizueta
7bdee5c13e Avoid double-quoting
Since dst_schema is a REGNAMESPACE, it is automatically quoted when casted to TEXT
2019-07-03 18:34:54 +02:00
Javier Goizueta
c07784566a NEWS for next release 2019-07-03 17:02:36 +02:00
Javier Goizueta
2e1fe2933c Fix whitespace 2019-07-03 16:56:29 +02:00
Rafa de la Torre
d378ca6fe0 Test for missing foreign CDB_TableMetadata 2019-07-03 16:25:16 +02:00
Rafa de la Torre
446f4113d9 Return NULL instead of NOW()
The absence of foreign CDB_TableMetadata actually means that we cannot
really tell when a remote table was modified.

Therefore we're using NULL with the meaning of "I don't know when it
was last modified".

To be taken in other caching layers, to adjust headers accordingly.
2019-07-03 16:19:46 +02:00
Javier Goizueta
5963c67376 Order the columns of a cartodbfied table consistently
The final order of the columns of a cartodbfied table wasn't uniquely specified, so could vary across PG versions.
This was a problem in particular for having deterministic test results.
2019-07-03 16:16:37 +02:00
Javier Goizueta
f5f18ca57c Fix the order of the columns 2019-07-02 18:14:18 +02:00
Javier Goizueta
d8c840d126 Quote table and column names when necessary
Also use type NAME when appropriate. Note that quoted column names are not NAMES (may be longer).
2019-07-02 18:14:04 +02:00
Javier Goizueta
65483743b4 Remove unused vars 2019-07-02 17:44:06 +02:00
Javier Goizueta
4651883454 Fix order of test results 2019-07-02 17:43:13 +02:00
Javier Goizueta
470bae6268 Merge branch 'master' into table-syncer-notmps
# Conflicts:
#	scripts-available/CDB_SyncTable.sql
2019-07-02 12:21:49 +02:00
Javier Goizueta
75ba13b834 Merge pull request #364 from CartoDB/synctable-doc
Document CDB_SyncTable
2019-07-02 10:20:24 +02:00
Javier Goizueta
057695361d Fix typo 2019-07-01 17:49:42 +02:00
Javier Goizueta
0e1aeb0a76 Minor copy edit 2019-07-01 17:00:22 +02:00
Javier Goizueta
f2dae651b3 More complete sync table example 2019-07-01 16:58:24 +02:00
Javier Goizueta
9f414938ba Document CDB_SyncTable 2019-07-01 16:41:40 +02:00
Rafa de la Torre
b0b4a92240 Add a test case for missing foreign CDB_Tablementadata 2019-06-28 16:19:57 +02:00
Rafa de la Torre
c06d24aa19 Make CDB_Get_Foreign_Updated_At robust to missing CDB_TableMetadata
This may happen with non-carto DB's, when checking the updated_at
times and not finding the corresponding remote.cdb_tablemetadata
imported from the foreign non-carto DB.

Instead of failing, return a NOW() timestampt, so that caching logic
just assumes there may have been changes.

This makes it work today, and leaves open the possibility of adding
the required carto metadata for homogeneous caching in the future.
2019-06-28 16:25:52 +02:00
Javier Goizueta
4f4df2de8d Merge branch 'table-syncer' into table-syncer-notmps 2019-06-28 15:28:06 +02:00
Javier Goizueta
0bcbf6708a Remove usage of temporary tables 2019-06-28 13:49:15 +02:00
9 changed files with 185 additions and 55 deletions

View File

@@ -1,7 +1,7 @@
# cartodb/Makefile
EXTENSION = cartodb
EXTVERSION = 0.28.0
EXTVERSION = 0.28.1
SED = sed
AWK = awk
@@ -100,6 +100,7 @@ UPGRADABLE = \
0.27.1 \
0.27.2 \
0.28.0 \
0.28.1 \
$(EXTVERSION)dev \
$(EXTVERSION)next \
$(END)

View File

@@ -1,3 +1,7 @@
0.28.1 (2019-07-04)
* Avoid temporary tables creation in CDB_SyncTable (#366)
* Make CDB_Get_Foreign_Updated_At robust to missing CDB_TableMetadata (#362)
0.28.0 (2019-07-01)
* New function CDB_SyncTable (#355)

56
doc/CDB_SyncTable.md Normal file
View File

@@ -0,0 +1,56 @@
Synchronize two tables. This function will synchronize a *destination* table with a *source* table.
The idea is that the *destination* is a replica of *source* and *source* has been subject to
modifications that are to be applied to *destination*.
This will be achieved by deleting the rows in the destination not present
in the source, inserting rows of the source not in the destination and updating modified rows.
If the destination table does not exist it will be created and all the rows of the source inserted into it.
Both tables must have a consistent `cartodb_id` primary key column which will be used to match
the source and destination rows.
Note that both tables do not necessarily become identical after the synchronization, since additional columns
may have been added to the destination; those columns will not be altered by the synchronization.
In addition some source columns may be skipped by listing them in the optional last argument; such columns
will not be updated in the destination, so if they are present in it their values won't be altered.
#### Using the function
Import some data using COPY FROM into a temporary table, then synchronize a table with the data and
finally delete the temporary table. This could be used import and update some data periodically while
allowing to add columns to the data that will be preserved across updates.
```sql
CREATE tmp_pois(cartodb_id int, name text, type text, longitude double precision, latitude double precision, rank int);
COPY tmp_pois FROM '/tmp/pois.csv';
SELECT CDB_SyncTable('tmp_pois', 'public', 'pois');
DROP TABLE tmp_pois;
```
Now we could perform some changes to the `pois` to maintain our own ranking:
```sql
UPDATE pois SET rank = random()*4 + 1;
```
Then, if the source were updated at `/tmp/pois.csv` we could synchronize with it while preserving our `rank` values with:
```sql
CREATE tmp_pois(cartodb_id int, name text, type text, longitude double precision, latitude double precision, rank int);
COPY tmp_pois FROM '/tmp/pois.csv';
SELECT CDB_SyncTable('tmp_pois', 'public', 'pois', '{rank}');
DROP TABLE tmp_pois;
```
#### Arguments
```
CDB_SyncTable(src_table, dst_schema, dst_table, skip_cols)
```
* **src_table** REGCLASS the source data for the synchronization
* **dst_scgena** REGNAMESPACE the destination schema
* **dst_table** NAME the destination table to be updated
* **skip_cols** NAME[] an array of column names, empty by default, which will be skipped

View File

@@ -1071,7 +1071,7 @@ BEGIN
-- by selecting their names into an array and
-- joining the array with a comma
SELECT
',' || array_to_string(array_agg(Format('%I',a.attname)),',') AS column_name_sql,
',' || array_to_string(array_agg(Format('%I',a.attname) ORDER BY a.attnum),',') AS column_name_sql,
Count(*) AS count
INTO rec
FROM pg_class c

View File

@@ -125,7 +125,14 @@ BEGIN
-- We assume that the remote cdb_tablemetadata is called cdb_tablemetadata and is on the same schema as the queried table.
SELECT nspname FROM pg_class c, pg_namespace n WHERE c.oid=foreign_table AND c.relnamespace = n.oid INTO fdw_schema_name;
EXECUTE FORMAT('SELECT updated_at FROM %I.cdb_tablemetadata WHERE tabname=%L ORDER BY updated_at DESC LIMIT 1', fdw_schema_name, remote_table_name) INTO time;
BEGIN
EXECUTE FORMAT('SELECT updated_at FROM %I.cdb_tablemetadata WHERE tabname=%L ORDER BY updated_at DESC LIMIT 1', fdw_schema_name, remote_table_name) INTO time;
EXCEPTION
WHEN undefined_table THEN
-- If you add a GET STACKED DIAGNOSTICS text_var = RETURNED_SQLSTATE
-- you get a code 42P01 which corresponds to undefined_table
RAISE NOTICE 'CDB_Get_Foreign_Updated_At: could not find %.cdb_tablemetadata while checking % updated_at, returning NULL timestamp', fdw_schema_name, foreign_table;
END;
RETURN time;
END
$$

View File

@@ -21,7 +21,8 @@ AS $$
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.oid = src_table::oid
AND pg_catalog.pg_table_is_visible(c.oid)
);
)
ORDER BY a.attnum;
$$ LANGUAGE sql STABLE PARALLEL UNSAFE;
@@ -67,6 +68,21 @@ AS $$
SELECT format('%s_%s_%s', prefix, txid_current(), (random()*1000000)::int)::NAME;
$$ LANGUAGE sql VOLATILE PARALLEL UNSAFE;
/*
Given a table name and an array of column names,
return array of column names qualified with the table name and quoted when necessary
tablename and colnames should be properly quoted, and for this reason the type NAME is not
used for them (with quotes they could exceed the maximum identifier length)
Example of usage:
SELECT @extschema@.__CDB_QualifyColumns('t', ARRAY['a','"b-1"']); --> ARRAY['t.a','t."b-1"']
*/
CREATE OR REPLACE FUNCTION @extschema@.__CDB_QualifyColumns(tablename NAME, colnames NAME[]) RETURNS TEXT[] AS
$$
SELECT array_agg(tablename || '.' || _colname) from unnest(colnames) _colname;
$$ LANGUAGE sql IMMUTABLE PARALLEL SAFE;
/*
A Table Syncer
@@ -88,10 +104,8 @@ DECLARE
fq_dest_table TEXT;
colnames TEXT[];
quoted_colnames TEXT;
src_hash_table_name NAME;
dst_hash_table_name NAME;
dst_colnames TEXT;
src_colnames TEXT;
update_set_clause TEXT;
@@ -101,8 +115,8 @@ DECLARE
t timestamptz;
BEGIN
-- If the destination table does not exist, just copy the source table
fq_dest_table := format('%I.%I', dst_schema, dst_table);
EXECUTE format('CREATE TABLE IF NOT EXISTS %s as TABLE %I', fq_dest_table, src_table);
fq_dest_table := format('%s.%I', dst_schema, dst_table);
EXECUTE format('CREATE TABLE IF NOT EXISTS %s as TABLE %s', fq_dest_table, src_table);
GET DIAGNOSTICS num_rows = ROW_COUNT;
IF num_rows > 0 THEN
RAISE NOTICE 'INSERTED % row(s)', num_rows;
@@ -113,35 +127,12 @@ BEGIN
-- Get the list of columns from the source table, excluding skip_cols
SELECT ARRAY(SELECT quote_ident(c) FROM @extschema@._CDB_GetColumns(src_table) as c EXCEPT SELECT unnest(skip_cols)) INTO colnames;
quoted_colnames := array_to_string(colnames, ',');
src_hash_table_name := @extschema@.__CDB_GenerateUniqueName('src_sync');
dst_hash_table_name := @extschema@.__CDB_GenerateUniqueName('dst_sync');
EXECUTE format('CREATE TEMP TABLE %I(cartodb_id BIGINT, hash TEXT) ON COMMIT DROP', src_hash_table_name);
EXECUTE format('CREATE TEMP TABLE %I(cartodb_id BIGINT, hash TEXT) ON COMMIT DROP', dst_hash_table_name);
-- Compute hash tables for src_table and dst_table h[cartodb_id] = hash(row)
-- It'll take the form of a temp table with an index (easy to run set operations)
t := clock_timestamp();
EXECUTE format('INSERT INTO %I SELECT cartodb_id, md5(ROW(%s)::text) hash FROM %I', src_hash_table_name, quoted_colnames, src_table);
EXECUTE format('INSERT INTO %I SELECT cartodb_id, md5(ROW(%s)::text) hash FROM %s', dst_hash_table_name, quoted_colnames, fq_dest_table);
RAISE DEBUG 'Populate hash tables time (s): %', clock_timestamp() - t;
-- Create indexes
-- We use hash indexes as they are fit for id comparison.
t := clock_timestamp();
EXECUTE format('CREATE INDEX ON %I USING HASH (cartodb_id)', src_hash_table_name);
EXECUTE format('CREATE INDEX ON %I USING HASH (cartodb_id)', dst_hash_table_name);
RAISE DEBUG 'Index creation on hash tables time (s): %', clock_timestamp() - t;
-- Deal with deleted rows: ids in dest but not in source
t := clock_timestamp();
EXECUTE format(
'DELETE FROM %s WHERE cartodb_id IN (SELECT cartodb_id FROM %I EXCEPT SELECT cartodb_id FROM %I)',
fq_dest_table,
dst_hash_table_name,
src_hash_table_name);
'DELETE FROM %1$s _dst WHERE NOT EXISTS (SELECT * FROM %2$s _src WHERE _src.cartodb_id=_dst.cartodb_id)',
fq_dest_table, src_table);
GET DIAGNOSTICS num_rows = ROW_COUNT;
RAISE NOTICE 'DELETED % row(s)', num_rows;
RAISE DEBUG 'DELETE time (s): %', clock_timestamp() - t;
@@ -149,29 +140,26 @@ BEGIN
-- Deal with inserted rows: ids in source but not in dest
t := clock_timestamp();
EXECUTE format('
INSERT INTO %1$s (cartodb_id,%2$s)
SELECT h.cartodb_id,%2$s FROM (SELECT cartodb_id FROM %3$I EXCEPT SELECT cartodb_id FROM %4$I) h
LEFT JOIN %5$I s ON s.cartodb_id = h.cartodb_id;
', fq_dest_table, quoted_colnames, src_hash_table_name, dst_hash_table_name, src_table);
INSERT INTO %1$s(cartodb_id, %2$s)
SELECT cartodb_id, %2$s FROM %3$s _src WHERE NOT EXISTS (SELECT * FROM %1$s _dst WHERE _src.cartodb_id=_dst.cartodb_id)
', fq_dest_table, array_to_string(colnames, ','), src_table);
GET DIAGNOSTICS num_rows = ROW_COUNT;
RAISE NOTICE 'INSERTED % row(s)', num_rows;
RAISE DEBUG 'INSERT time (s): %', clock_timestamp() - t;
-- Deal with modified rows: ids in source and dest but different hashes
t := clock_timestamp();
update_set_clause := @extschema@.__CDB_GetUpdateSetClause(colnames, 'changed');
update_set_clause := @extschema@.__CDB_GetUpdateSetClause(colnames, '_changed');
dst_colnames := array_to_string(@extschema@.__CDB_QualifyColumns('_dst', colnames), ',');
src_colnames := array_to_string(@extschema@.__CDB_QualifyColumns('_src', colnames), ',');
EXECUTE format('
UPDATE %1$s dst SET %2$s
FROM (
SELECT *
FROM %3$s src
WHERE cartodb_id IN
(SELECT sh.cartodb_id FROM %4$I sh
LEFT JOIN %5$I dh ON sh.cartodb_id = dh.cartodb_id
WHERE sh.hash <> dh.hash)
) changed
WHERE dst.cartodb_id = changed.cartodb_id;
', fq_dest_table, update_set_clause, src_table, src_hash_table_name, dst_hash_table_name);
UPDATE %1$s _update SET %2$s
FROM (
SELECT _src.* FROM %3$s _src JOIN %1$s _dst ON (_dst.cartodb_id = _src.cartodb_id)
WHERE md5(ROW(%4$s)::text) <> md5(ROW(%5$s)::text)
) _changed
WHERE _update.cartodb_id = _changed.cartodb_id;
', fq_dest_table, update_set_clause, src_table, dst_colnames, src_colnames);
GET DIAGNOSTICS num_rows = ROW_COUNT;
RAISE NOTICE 'MODIFIED % row(s)', num_rows;
RAISE DEBUG 'UPDATE time (s): %', clock_timestamp() - t;

View File

@@ -50,9 +50,45 @@ UPDATE test_sync_dest SET the_geom = cartodb.CDB_LatLng(lat, lon); -- A "gecodin
\set QUIET off
SET client_min_messages TO notice;
SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest', '{the_geom, the_geom_webmercator}');
SELECT * FROM test_sync_source;
SELECT * FROM test_sync_dest;
SELECT * FROM test_sync_source ORDER BY cartodb_id;
SELECT * FROM test_sync_dest ORDER BY cartodb_id;
\echo 'It will work with schemas that need quoting'
\set QUIET on
SET client_min_messages TO error;
CREATE SCHEMA "sch-ema";
CREATE TABLE "test_sync_source2" AS SELECT * FROM test_sync_source;
\set QUIET off
SELECT cartodb.CDB_SyncTable('test_sync_source2', 'sch-ema', 'test_sync_dest');
INSERT INTO test_sync_source2(cartodb_id, lat, lon, name) VALUES (6, 6.0, 6.0, 'papaya');
DELETE FROM test_sync_source2 WHERE cartodb_id = 4;
UPDATE test_sync_source2 SET lat = 2.5 WHERE cartodb_id = 2;
SET client_min_messages TO notice;
SELECT cartodb.CDB_SyncTable('test_sync_source2', 'sch-ema', 'test_sync_dest');
\echo 'It will work with table names that need quoting'
\set QUIET on
SET client_min_messages TO error;
CREATE TABLE "test-sync-source" AS SELECT * FROM test_sync_source;
\set QUIET off
SELECT cartodb.CDB_SyncTable('test-sync-source', 'public', 'test-sync-dest');
INSERT INTO "test-sync-source"(cartodb_id, lat, lon, name) VALUES (6, 6.0, 6.0, 'papaya');
DELETE FROM "test-sync-source" WHERE cartodb_id = 4;
UPDATE "test-sync-source" SET lat = 2.5 WHERE cartodb_id = 2;
SET client_min_messages TO notice;
SELECT cartodb.CDB_SyncTable('test-sync-source', 'public', 'test-sync-dest');
\echo 'It will work with column names that need quoting'
\set QUIET on
SET client_min_messages TO error;
ALTER TABLE test_sync_source ADD COLUMN "a-column" int;
\set QUIET off
SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest2');
INSERT INTO test_sync_source(cartodb_id, lat, lon, name) VALUES (6, 6.0, 6.0, 'papaya');
DELETE FROM test_sync_source WHERE cartodb_id = 4;
UPDATE test_sync_source SET lat = 2.5 WHERE cartodb_id = 2;
SET client_min_messages TO notice;
SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest2');
-- Cleanup
ROLLBACK;

View File

@@ -52,10 +52,43 @@ NOTICE: MODIFIED 0 row(s)
1|||1|1|foo
2|||2|2|bar
5|||5|5|sandia
4|||4|4|cantaloupe
5|||5|5|sandia
1|0101000020E6100000000000000000F03F000000000000F03F|0101000020110F0000DB0B4ADA772DFB402B432E49D22DFB40|1|1|foo
2|0101000020E610000000000000000000400000000000000040|0101000020110F00003C0C4ADA772D0B4177F404ABE12E0B41|2|2|bar
5|0101000020E610000000000000000014400000000000001440|0101000020110F000099476EE86AFC20413E7EB983F2012141|5|5|sandia
4|0101000020E610000000000000000010400000000000001040|0101000020110F00003C0C4ADA772D1B4160AB497020331B41|4|4|cantaloupe
5|0101000020E610000000000000000014400000000000001440|0101000020110F000099476EE86AFC20413E7EB983F2012141|5|5|sandia
It will work with schemas that need quoting
INSERT 0 1
DELETE 1
UPDATE 1
SET
NOTICE: relation "test_sync_dest" already exists, skipping
NOTICE: DELETED 1 row(s)
NOTICE: INSERTED 1 row(s)
NOTICE: MODIFIED 1 row(s)
It will work with table names that need quoting
INSERT 0 1
DELETE 1
UPDATE 1
SET
NOTICE: relation "test-sync-dest" already exists, skipping
NOTICE: DELETED 1 row(s)
NOTICE: INSERTED 1 row(s)
NOTICE: MODIFIED 1 row(s)
It will work with column names that need quoting
INSERT 0 1
DELETE 1
UPDATE 1
SET
NOTICE: relation "test_sync_dest2" already exists, skipping
NOTICE: DELETED 1 row(s)
NOTICE: INSERTED 1 row(s)
NOTICE: MODIFIED 1 row(s)
ROLLBACK

View File

@@ -586,6 +586,11 @@ test_extension|public|"local-table-with-dashes"'
sql postgres "SELECT cartodb.CDB_Last_Updated_Time(ARRAY['test_extension.public.\"local-table-with-dashes\"']::text[]) < now()" should 't'
sql postgres "SELECT cartodb.CDB_Last_Updated_Time(ARRAY['test_extension.public.\"local-table-with-dashes\"']::text[]) > (now() - interval '1 minute')" should 't'
# Check CDB_Get_Foreign_Updated_At is robust to unimported CDB_TableMetadata
sql postgres "DROP FOREIGN TABLE IF EXISTS test_fdw.cdb_tablemetadata;"
sql postgres "SELECT cartodb.CDB_Get_Foreign_Updated_At('test_fdw.foo') IS NULL" should 't'
# Teardown
DATABASE=fdw_target sql postgres 'REVOKE USAGE ON SCHEMA test_fdw FROM fdw_user;'
DATABASE=fdw_target sql postgres 'REVOKE SELECT ON test_fdw.foo FROM fdw_user;'
DATABASE=fdw_target sql postgres 'REVOKE SELECT ON test_fdw.foo2 FROM fdw_user;'