222 lines
6.8 KiB
PL/PgSQL
222 lines
6.8 KiB
PL/PgSQL
--------------------------------------------------------------------------------
|
|
-- Private functions
|
|
--------------------------------------------------------------------------------
|
|
|
|
--
|
|
-- Import a foreign table if it does not exist
|
|
--
|
|
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_Import_If_Not_Exists(server_internal name, remote_schema name, remote_table name)
|
|
RETURNS void
|
|
AS $$
|
|
DECLARE
|
|
local_schema name := @extschema@.__CDB_FS_Create_Schema(server_internal, remote_schema);
|
|
BEGIN
|
|
IF NOT EXISTS (
|
|
SELECT * FROM pg_class
|
|
WHERE relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = local_schema)
|
|
AND relname = remote_table
|
|
) THEN
|
|
EXECUTE format('IMPORT FOREIGN SCHEMA %I LIMIT TO (%I) FROM SERVER %I INTO %I',
|
|
remote_schema, remote_table, server_internal, local_schema);
|
|
END IF;
|
|
END
|
|
$$
|
|
LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE;
|
|
|
|
--
|
|
-- Get the version of a remote PG server
|
|
--
|
|
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_Foreign_Server_Version_PG(server_internal name)
|
|
RETURNS text
|
|
AS $$
|
|
DECLARE
|
|
remote_schema name := 'pg_catalog';
|
|
remote_table name := 'pg_settings';
|
|
local_schema name := @extschema@.__CDB_FS_Create_Schema(server_internal, remote_schema);
|
|
remote_server_version text;
|
|
BEGIN
|
|
PERFORM @extschema@.__CDB_FS_Import_If_Not_Exists(server_internal, remote_schema, remote_table);
|
|
|
|
BEGIN
|
|
EXECUTE format('
|
|
SELECT setting FROM %I.%I WHERE name = ''server_version'';
|
|
', local_schema, remote_table) INTO remote_server_version;
|
|
EXCEPTION WHEN OTHERS THEN
|
|
RAISE EXCEPTION 'Not enough permissions to access the server "%"',
|
|
@extschema@.__CDB_FS_Extract_Server_Name(server_internal);
|
|
END;
|
|
|
|
RETURN remote_server_version;
|
|
END
|
|
$$
|
|
LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE;
|
|
|
|
|
|
--
|
|
-- Get the PostGIS extension version of a remote PG server
|
|
--
|
|
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_Foreign_PostGIS_Version_PG(server_internal name)
|
|
RETURNS text
|
|
AS $$
|
|
DECLARE
|
|
remote_schema name := 'pg_catalog';
|
|
remote_table name := 'pg_extension';
|
|
local_schema name := @extschema@.__CDB_FS_Create_Schema(server_internal, remote_schema);
|
|
remote_postgis_version text;
|
|
BEGIN
|
|
PERFORM @extschema@.__CDB_FS_Import_If_Not_Exists(server_internal, remote_schema, remote_table);
|
|
|
|
BEGIN
|
|
EXECUTE format('
|
|
SELECT extversion FROM %I.%I WHERE extname = ''postgis'';
|
|
', local_schema, remote_table) INTO remote_postgis_version;
|
|
EXCEPTION WHEN OTHERS THEN
|
|
RAISE EXCEPTION 'Not enough permissions to access the server "%"',
|
|
@extschema@.__CDB_FS_Extract_Server_Name(server_internal);
|
|
END;
|
|
|
|
RETURN remote_postgis_version;
|
|
END
|
|
$$
|
|
LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE;
|
|
|
|
|
|
--
|
|
-- Get the foreign server options of a remote PG server
|
|
--
|
|
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_Foreign_Server_Options_PG(server_internal name)
|
|
RETURNS jsonb
|
|
AS $$
|
|
-- See https://www.postgresql.org/docs/current/catalog-pg-foreign-server.html
|
|
-- See https://www.postgresql.org/docs/current/functions-info.html
|
|
SELECT jsonb_object_agg(opt.option_name, opt.option_value) FROM (
|
|
SELECT (pg_options_to_table(srvoptions)).*
|
|
FROM pg_foreign_server
|
|
WHERE srvname = server_internal
|
|
) AS opt;
|
|
$$
|
|
LANGUAGE SQL VOLATILE PARALLEL UNSAFE;
|
|
|
|
|
|
--
|
|
-- Get a foreign PG server hostname from the catalog
|
|
--
|
|
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_Foreign_Server_Host_PG(server_internal name)
|
|
RETURNS text
|
|
AS $$
|
|
BEGIN
|
|
-- TODO: implement
|
|
RETURN 'localhost';
|
|
END
|
|
$$
|
|
LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE;
|
|
|
|
|
|
--
|
|
-- Get a foreign PG server port from the catalog
|
|
--
|
|
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_Foreign_Server_Port_PG(server_internal name)
|
|
RETURNS integer
|
|
AS $$
|
|
BEGIN
|
|
-- TODO: implement
|
|
RETURN 5432;
|
|
END
|
|
$$
|
|
LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE;
|
|
|
|
--
|
|
-- Get one measure of network latency in ms to a remote TCP server
|
|
--
|
|
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_TCP_Network_Latency(
|
|
host text,
|
|
port integer DEFAULT 5432,
|
|
timeout_seconds float DEFAULT 5.0
|
|
)
|
|
RETURNS float
|
|
AS $$
|
|
import socket
|
|
from timeit import default_timer as timer
|
|
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.settimeout(timeout_seconds)
|
|
|
|
t_start = timer()
|
|
|
|
try:
|
|
s.connect((host, int(port)))
|
|
t_stop = timer()
|
|
s.shutdown(socket.SHUT_RD)
|
|
except (socket.timeout, OSError, socket.error):
|
|
return None
|
|
|
|
return (t_stop - t_start) * 1000.0
|
|
$$
|
|
LANGUAGE plpythonu VOLATILE PARALLEL UNSAFE;
|
|
|
|
--
|
|
-- Get the network latency to a remote PG server
|
|
--
|
|
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_Foreign_Server_Latency_PG(server_internal name)
|
|
RETURNS float
|
|
AS $$
|
|
DECLARE
|
|
remote_server_host text := @extschema@.__CDB_FS_Foreign_Server_Host_PG(server_internal);
|
|
remote_server_port integer := @extschema@.__CDB_FS_Foreign_Server_Port_PG(server_internal);
|
|
latency float;
|
|
BEGIN
|
|
latency := @extschema@.__CDB_FS_TCP_Network_Latency(host => remote_server_host, port => remote_server_port);
|
|
RETURN latency;
|
|
END
|
|
$$
|
|
LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE;
|
|
|
|
|
|
--
|
|
-- Collect and return diagnostics info from a remote PG into a jsonb
|
|
--
|
|
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_Server_Diagnostics_PG(server_internal name)
|
|
RETURNS jsonb
|
|
AS $$
|
|
DECLARE
|
|
remote_server_version text := @extschema@.__CDB_FS_Foreign_Server_Version_PG(server_internal);
|
|
remote_postgis_version text := @extschema@.__CDB_FS_Foreign_PostGIS_Version_PG(server_internal);
|
|
remote_server_options jsonb := @extschema@.__CDB_FS_Foreign_Server_Options_PG(server_internal);
|
|
remote_server_latency float := @extschema@.__CDB_FS_Foreign_Server_Latency_PG(server_internal);
|
|
BEGIN
|
|
RETURN jsonb_build_object(
|
|
'server_version', remote_server_version,
|
|
'postgis_version', remote_postgis_version,
|
|
'server_options', remote_server_options,
|
|
'server_latency', remote_server_latency
|
|
);
|
|
END
|
|
$$
|
|
LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE;
|
|
|
|
|
|
|
|
--------------------------------------------------------------------------------
|
|
-- Public functions
|
|
--------------------------------------------------------------------------------
|
|
|
|
--
|
|
-- Collect and return diagnostics info from a remote PG into a jsonb
|
|
--
|
|
CREATE OR REPLACE FUNCTION @extschema@.CDB_Federated_Server_Diagnostics(server TEXT)
|
|
RETURNS jsonb
|
|
AS $$
|
|
DECLARE
|
|
server_internal name := @extschema@.__CDB_FS_Generate_Server_Name(input_name => server, check_existence => true);
|
|
server_type name := @extschema@.__CDB_FS_server_type(server_internal);
|
|
BEGIN
|
|
CASE server_type
|
|
WHEN 'postgres_fdw' THEN
|
|
RETURN @extschema@.__CDB_FS_Server_Diagnostics_PG(server_internal);
|
|
ELSE
|
|
RAISE EXCEPTION 'Not implemented server type % for remote server %', server_type, server;
|
|
END CASE;
|
|
END
|
|
$$
|
|
LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE;
|