Initial implementation of CDB_Federated_Table_Register

This commit is contained in:
Raul Marin
2019-10-30 11:40:35 +01:00
parent 77868a541d
commit a26b96347a
6 changed files with 275 additions and 8 deletions

View File

@@ -209,7 +209,7 @@ BEGIN
EXECUTE FORMAT ('CREATE USER MAPPING FOR public SERVER %I', server_internal);
EXCEPTION WHEN OTHERS THEN
RAISE EXCEPTION 'Could not create server %: %', server, SQLERRM
USING HINT = 'Please clean the remaining objects"';
USING HINT = 'Please clean the left over objects';
END;
END IF;

View File

@@ -18,6 +18,7 @@ DECLARE
inf_schema name := 'information_schema';
remote_table name := 'schemata';
local_schema name := @extschema@.__CDB_FS_Create_Schema(server_internal, inf_schema);
role_name text := @extschema@.__CDB_FS_Generate_Server_Role_Name(server_internal);
BEGIN
-- Import the foreign schemata if not done
IF NOT EXISTS (
@@ -82,18 +83,18 @@ LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE;
--
-- List remote schemas in a federated server that the current user has access to.
--
CREATE OR REPLACE FUNCTION @extschema@.CDB_Federated_Server_List_Remote_Schemas(remote_server name)
CREATE OR REPLACE FUNCTION @extschema@.CDB_Federated_Server_List_Remote_Schemas(server TEXT)
RETURNS TABLE(remote_schema name)
AS $$
DECLARE
server_internal name := @extschema@.__CDB_FS_Generate_Server_Name(input_name := remote_server, check_existence := true);
server_internal name := @extschema@.__CDB_FS_Generate_Server_Name(input_name := server, check_existence := true);
server_type name := @extschema@.__CDB_FS_server_type(server_internal);
BEGIN
CASE server_type
WHEN 'postgres_fdw' THEN
RETURN QUERY SELECT @extschema@.__CDB_FS_List_Foreign_Schemas_PG(server_internal);
ELSE
RAISE EXCEPTION 'Not implemented server type % for remote server %', server_type, remote_server;
RAISE EXCEPTION 'Not implemented server type % for remote server %', server_type, server;
END CASE;
END
$$
@@ -102,11 +103,11 @@ LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE;
--
-- List remote tables in a federated server that the current user has access to.
--
CREATE OR REPLACE FUNCTION @extschema@.CDB_Federated_Server_List_Remote_Tables(remote_server name, remote_schema name)
CREATE OR REPLACE FUNCTION @extschema@.CDB_Federated_Server_List_Remote_Tables(server TEXT, remote_schema TEXT)
RETURNS TABLE(remote_table name)
AS $$
DECLARE
server_internal name := @extschema@.__CDB_FS_Generate_Server_Name(input_name := remote_server, check_existence := true);
server_internal name := @extschema@.__CDB_FS_Generate_Server_Name(input_name := server, check_existence := true);
server_type name := @extschema@.__CDB_FS_server_type(server_internal);
BEGIN
CASE server_type

View File

@@ -0,0 +1,173 @@
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_Column_Is_Integer(input_table REGCLASS, colname NAME)
RETURNS boolean
AS $$
BEGIN
PERFORM atttypid FROM pg_catalog.pg_attribute
WHERE attrelid = input_table
AND attname = colname
AND atttypid IN (SELECT oid FROM pg_type
WHERE typname IN
('smallint', 'integer', 'bigint', 'int2', 'int4', 'int8'));
RETURN FOUND;
END
$$
LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE;
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_Column_Is_Geometry(input_table REGCLASS, colname NAME)
RETURNS boolean
AS $$
BEGIN
PERFORM atttypid FROM pg_catalog.pg_attribute
WHERE attrelid = input_table
AND attname = colname
AND atttypid = 'geometry'::regtype;
RETURN FOUND;
END
$$
LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE;
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_GetColumns(input_table REGCLASS)
RETURNS SETOF NAME
AS $$
SELECT
a.attname as "colname"
FROM pg_catalog.pg_attribute a
WHERE
a.attnum > 0
AND NOT a.attisdropped
AND a.attrelid = (
SELECT c.oid
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.oid = input_table::oid
)
ORDER BY a.attnum;
$$ LANGUAGE SQL;
--
-- Set up a Federated Table
--
-- Precondition: the federated server has to be set up via
-- CDB_SetUp_PG_Federated_Server
--
-- Postcondition: it generates a view in the schema of the user that
-- can be used through SQL and Maps API's.
--
-- E.g:
-- SELECT cartodb.CDB_SetUp_PG_Federated_Table(
-- 'amazon', -- mandatory, name of the federated server
-- 'my_remote_schema', -- mandatory, schema name
-- 'my_remote_table', -- mandatory, table name
-- 'id', -- mandatory, name of the id column
-- 'geom', -- optional, name of the geom column, preferably in 4326
-- 'webmercator' -- optional, should be in 3857 if present
-- );
--
CREATE OR REPLACE FUNCTION @extschema@.CDB_Federated_Table_Register(
server TEXT,
remote_schema TEXT,
remote_table TEXT,
id_column TEXT,
geom_column TEXT DEFAULT NULL,
webmercator_column TEXT DEFAULT NULL,
local_name NAME DEFAULT NULL
)
RETURNS void
AS $$
DECLARE
server_internal name := @extschema@.__CDB_FS_Generate_Server_Name(input_name := server, check_existence := false);
local_schema name := @extschema@.__CDB_FS_Create_Schema(server_internal, remote_schema);
src_table REGCLASS;
rest_of_cols TEXT[];
geom_expression TEXT;
webmercator_expression TEXT;
carto_columns_expression TEXT[];
BEGIN
-- Use geom_column as default for webmercator_column
IF webmercator_column IS NULL THEN
webmercator_column := geom_column;
END IF;
IF local_name IS NULL THEN
local_name := remote_table;
END IF;
-- Import the foreign table
EXECUTE FORMAT ('IMPORT FOREIGN SCHEMA %I LIMIT TO (%I) FROM SERVER %I INTO %I;', remote_schema, remote_table, server_internal, local_schema);
src_table := format('%I.%I', local_schema, remote_table);
--- Grant SELECT to fdw role (TODO: Re-enable if needed)
--- EXECUTE FORMAT ('GRANT SELECT ON %I.%I TO %I;', fdw_objects_name, table_name, fdw_objects_name);
-- Check id_column is numeric
IF NOT @extschema@.__CDB_FS_Column_Is_Integer(src_table, id_column) THEN
RAISE EXCEPTION 'non integer id_column "%"', id_column;
END IF;
-- Check if the geom and mercator columns have a geometry type (if provided)
IF geom_column IS NOT NULL AND NOT @extschema@.__CDB_FS_Column_Is_Geometry(src_table, geom_column) THEN
RAISE EXCEPTION 'non geometry column "%"', geom_column;
END IF;
IF webmercator_column IS NOT NULL AND NOT @extschema@.__CDB_FS_Column_Is_Geometry(src_table, webmercator_column) THEN
RAISE EXCEPTION 'non geometry column "%"', webmercator_column;
END IF;
-- Get a list of columns excluding the id, geom and the_geom_webmercator
SELECT ARRAY(
SELECT quote_ident(c) FROM @extschema@.__CDB_FS_GetColumns(src_table) AS c
WHERE c NOT IN (SELECT * FROM (SELECT unnest(ARRAY[id_column, geom_column, webmercator_column, 'cartodb_id', 'the_geom', 'the_geom_webmercator']) col) carto WHERE carto.col IS NOT NULL)
) INTO rest_of_cols;
IF geom_column IS NULL
THEN
geom_expression := 'NULL AS the_geom';
ELSIF @postgisschema@.Find_SRID(local_schema::varchar, remote_table::varchar, geom_column::varchar) = 4326
THEN
geom_expression := format('t.%I AS the_geom', geom_column);
ELSE
-- It needs an ST_Transform to 4326
geom_expression := format('@postgisschema@.ST_Transform(t.%I, 4326) AS the_geom', geom_column);
END IF;
IF webmercator_column IS NULL
THEN
webmercator_expression := 'NULL AS the_geom_webmercator';
ELSIF @postgisschema@.Find_SRID(local_schema::varchar, remote_table::varchar, webmercator_column::varchar) = 3857
THEN
webmercator_expression := format('t.%I AS the_geom_webmercator', webmercator_column);
ELSE
-- It needs an ST_Transform to 3857
webmercator_expression := format('@postgisschema@.ST_Transform(t.%I, 3857) AS the_geom_webmercator', webmercator_column);
END IF;
-- CARTO columns expressions
carto_columns_expression := ARRAY[
format('t.%1$I AS cartodb_id', id_column),
geom_expression,
webmercator_expression
];
-- Create a view with homogeneous CDB fields
BEGIN
EXECUTE format(
'CREATE OR REPLACE VIEW %1$I AS
SELECT %2s
FROM %3$s t',
local_name,
array_to_string(carto_columns_expression || rest_of_cols, ','),
src_table
);
EXCEPTION WHEN OTHERS THEN
RAISE EXCEPTION 'Could not import table "%" as "%": %', remote_table, local_name, SQLERRM;
END;
-- TODO: Handle this Grant perms to the view
-- EXECUTE format('GRANT SELECT ON %I TO %s', table_name, fdw_objects_name);
END
$$
LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE;

View File

@@ -83,10 +83,10 @@ SET client_min_messages TO notice;
-- Test the listing functions
-- ===================================================================
\echo 'Test listing of remote schemas (sunny day)'
SELECT * FROM cartodb.CDB_Federated_Server_List_Remote_Schemas(remote_server => 'loopback');
SELECT * FROM cartodb.CDB_Federated_Server_List_Remote_Schemas(server => 'loopback');
\echo 'Test listing of remote tables (sunny day)'
SELECT * FROM cartodb.CDB_Federated_Server_List_Remote_Tables(remote_server => 'loopback', remote_schema => 'S 1');
SELECT * FROM cartodb.CDB_Federated_Server_List_Remote_Tables(server => 'loopback', remote_schema => 'S 1');
-- ===================================================================
-- Cleanup
@@ -111,4 +111,5 @@ SELECT 'D1', cartodb.CDB_Federated_Server_Unregister(server := 'loopback'::text)
SELECT 'D2', cartodb.CDB_Federated_Server_Unregister(server := 'loopback2'::text);
DROP DATABASE cdb_fs_tester;
DROP ROLE cdb_fs_tester;
DROP EXTENSION postgres_fdw;
\set QUIET off

View File

@@ -0,0 +1,82 @@
-- ===================================================================
-- create FDW objects
-- ===================================================================
\set QUIET on
SET client_min_messages TO error;
\set VERBOSITY terse
SET SESSION AUTHORIZATION postgres;
CREATE EXTENSION postgres_fdw;
CREATE ROLE cdb_fs_tester SUPERUSER LOGIN PASSWORD 'cdb_fs_passwd';
CREATE DATABASE cdb_fs_tester OWNER cdb_fs_tester;
SELECT 'C1', cartodb.CDB_Federated_Server_Register_PG(server := 'loopback'::text, config := '{
"server": {
"host": "localhost",
"port": @@PGPORT@@
},
"credentials": {
"username": "cdb_fs_tester",
"password": "cdb_fs_passwd"
}
}'::jsonb);
-- ===================================================================
-- create objects used through FDW loopback server
-- ===================================================================
\c cdb_fs_tester postgres
CREATE EXTENSION postgis;
\c cdb_fs_tester cdb_fs_tester
CREATE SCHEMA remote_schema;
CREATE TABLE remote_schema.remote_geom(id int, another_field text, geom geometry(Geometry,4326));
INSERT INTO remote_schema.remote_geom VALUES (1, 'patata', 'SRID=4326;POINT(1 1)'::geometry);
INSERT INTO remote_schema.remote_geom VALUES (2, 'patata2', 'SRID=4326;POINT(2 2)'::geometry);
CREATE TABLE remote_schema.remote_geom2(id bigint, another_field text, geom geometry(Geometry,4326), geom_mercator geometry(Geometry,3857));
INSERT INTO remote_schema.remote_geom2 VALUES (3, 'patata', 'SRID=4326;POINT(3 3)'::geometry, 'SRID=3857;POINT(3 3)');
CREATE TABLE remote_schema.remote_other(id bigint, field text, field2 text);
INSERT INTO remote_schema.remote_other VALUES (1, 'delicious', 'potatoes');
-- ===================================================================
-- Test the listing functions
-- ===================================================================
\c contrib_regression postgres
SET client_min_messages TO error;
\set VERBOSITY terse
\set QUIET off
SELECT 'R1', cartodb.CDB_Federated_Table_Register(
server => 'loopback',
remote_schema => 'remote_schema',
remote_table => 'remote_geom',
id_column => 'id',
geom_column => 'geom'
);
SELECT 'V1', pg_get_viewdef('remote_geom');
SELECT 'S1', cartodb_id, ST_AsText(the_geom), another_field FROM remote_geom;
-- ===================================================================
-- Cleanup
-- ===================================================================
\set QUIET on
\c contrib_regression postgres
SET client_min_messages TO error;
\set VERBOSITY terse
SELECT 'D1', cartodb.CDB_Federated_Server_Unregister(server := 'loopback'::text);
DROP DATABASE cdb_fs_tester;
DROP ROLE cdb_fs_tester;
DROP EXTENSION postgres_fdw;
\set QUIET off

View File

@@ -0,0 +1,10 @@
C1|
R1|
V1| SELECT t.id AS cartodb_id,
t.geom AS the_geom,
st_transform(t.geom, 3857) AS the_geom_webmercator,
t.another_field
FROM cdb_fs_schema_b904664b5208433cd85a1693ba4f7570.remote_geom t;
S1|1|POINT(1 1)|patata
S1|2|POINT(2 2)|patata2
D1|