CREATE OR REPLACE FUNCTION wait_for_streaming_lag(low_water_mark BIGINT DEFAULT 1000000, high_water_mark BIGINT DEFAULT 20000000, tmout INTERVAL DEFAULT '4h')
RETURNS BIGINT
AS $def$
DECLARE r          RECORD;
        water_mark BIGINT;
BEGIN
    SET LOCAL client_min_messages TO ERROR;
    CREATE TEMP TABLE IF NOT EXISTS lag (
        gen              INT,
        application_name TEXT,
        client_addr      INET,
        flush_location   TEXT,
        lmd              TIMESTAMP
    );
    SET LOCAL client_min_messages TO NOTICE;

    water_mark := $2;           -- use high_water_mark for the first loop

    LOOP
        WITH g AS (SELECT max(gen) AS gen FROM lag),
             r AS (SELECT 1 AS ord, application_name, client_addr, flush_location, clock_timestamp() AS lmd
                     FROM pg_stat_replication
                    UNION ALL
                   SELECT 2 AS ord, application_name, client_addr, flush_location, lmd
                     FROM lag)
        INSERT INTO lag
        SELECT coalesce(g.gen+1, 1), rx.*
          FROM (SELECT DISTINCT ON (application_name, client_addr)
                       application_name, client_addr, flush_location, lmd
                  FROM r
                 ORDER BY application_name,
                          client_addr,
                          ord ASC,
                          pg_xlog_location_diff(flush_location, '0/0') ASC) rx CROSS JOIN g;
        DELETE FROM lag WHERE gen<(SELECT max(gen) FROM lag);
        DELETE FROM lag WHERE lmd<clock_timestamp() - '5min'::INTERVAL;

        SELECT INTO r coalesce(max(pg_xlog_location_diff(pg_current_xlog_location(), flush_location)), 0) AS lag,
                      clock_timestamp()-now() AS tm FROM lag;

        EXIT WHEN r.lag <= water_mark;

        IF r.tm>$3 THEN
            RAISE EXCEPTION USING
                MESSAGE='Timeout while waiting for streaming lag to drop below ' || $1,
                ERRCODE='TF001';
        END IF;

        water_mark := $1;
        PERFORM pg_sleep(1);
    END LOOP;

    RETURN r.lag;
END;
$def$ LANGUAGE plpgsql VOLATILE SECURITY invoker;
