Friday, September 28, 2012

ORACLE STREAMS BIDIRECTIONAL CONFIGURATION BEST PRACTICES

ORACLE STREAMS BIDIRECTIONAL CONFIGURATION WITH CONFLICT RESOLUTION BEST PRACTICES

This section recommends best practice for configuring Streams for Oracle Database 10g Release 2. The discussions are divided into the following sections:
• Guidelines for Preparing All Streams Configurations
• Recommendations for Downstream Capture Configurations
• Recommendations for Upstream (Local) Capture Configurations
All users should perform the instructions in the Guidelines for Preparing All Streams Configurations section of this white paper before branching off to either the downstream or the upstream configuration recommendations.
The examples in this article are based on a configuration that uses:
• Two databases
• Stream database administrator privileges
• Automatic Storage Management (ASM).
Guidelines for Preparing All Streams Configurations
The following list summarizes the tasks you must perform to properly prepare each Streams database:
Use Oracle Database 10g Release 2 (10.2.0.4)

Ensure that all Streams databases are running Oracle Database 10g Release 2 (release 10.2.0.4 is recommended) and apply any critical patch sets. Although the source database must be running Oracle Database 10g Release 2, the downstream capture database can run Release 10.2.0.4 or a later release.
See the Oracle Database Upgrade Guide [7] for database upgrade information and the "Patches and Updates" tab on Oracle Metalink for information about critical Streams patches for release 10.2 and higher releases.

Verify platform requirements for Downstream Capture
Downstream capture requires that the source and Destination database are running on the same platform.

Prepare the Source and Target databases Redo logs for Streams
1. Configure the source and target databases in ARCHIVELOG mode. For one-way Streams local capture, you must configure the source database to run in ARCHIVELOG mode, but the best practice is for both the source and target databases to be in ARCHIVELOG mode in case media recovery is required for either of the databases. If replication is bi-directional, then you must configure both source and target databases in ARCHIVELOG mode.
2. Configure the local archive Destination, LOG_ARCHIVE_TARGET_1, parameter and do not use a flash recovery area. Streams local capture and downstream capture require online redo logs and archived redo logs. Do not place archived redo log files in the Flash Recovery area, because this is a fixed-size storage area and the files may be deleted if the amount of space becomes low.

Create a Tablespace Dedicated to Streams
Create a dedicated Streams tablespace to contain the Streams AQ tables and other objects related to Streams, and to allow for better space management for the Streams objects:
• For downstream capture, create the Streams tablespace on the downstream capture database only.
• For upstream capture(and bidirectional), create the Streams tablespace on both the source and target databases.
For example, to create a Streams tablespace of 50 MB, which is the minimum recommended size, issue the following statements:
CREATE TABLESPACE streams_data
LOGGING
EXTENT MANAGEMENT LOCAL AUTOALLOCATE
SEGMENT SPACE MANAGEMENT AUTO
DATAFILE '+DATA' SIZE 50M
AUTOEXTEND ON NEXT 50M MAXSIZE UNLIMITED ;

Create the Streams Administrator database user

Create the Streams administrator account that you will use to create and modify all Streams configurations and perform administrator functions with the DBMS_STREAMS_ADM PL/SQL package. Do this both on the Source and Destination databases.
For example, to create a new user named streamsadmin with the default streams_data tablespace, enter the following SQL*Plus statement as SYSDBA:
drop user STRMADMIN cascade;
commit;

CREATE USER STRMADMIN IDENTIFIED BY STRMADMIN
DEFAULT TABLESPACE STREAMS_DATA
TEMPORARY TABLESPACE TEMP
QUOTA UNLIMITED ON STREAMS_DATA
QUOTA UNLIMITED ON STREAMS_INDEX;

Grant Streams authorization and DBA privileges
Use the DBMS_STREAMS_AUTH PL/SQL procedure to grant all of the privileges necessary for the Streams package procedures to function properly. Do this both on the Source and Destination databases:

GRANT DBA TO STRMADMIN;

BEGIN
DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE(
grantee => 'STRMADMIN',
grant_privileges => true);
END;
/

Also, set Logminer to use the Streams tablespace. Do this both on the Source and Destination databases.

exec DBMS_LOGMNR_D.SET_TABLESPACE ('STREAMS_DATA');
Check that the streams administartor account is created:
SELECT * FROM dba_streams_administrator;

Set key initialization parameters
On each Streams database in the configuration, specify the initialization parameters shown below:
Parameter Name Description
AQ_TM_PROCESSES alter system set aq_tm_processes=1 scope=spfile sid='*';
DB_NAME Select name from v$database. Each database should be the same. Ex. PRODDB
DB_UNIQUE_NAME Each database name will be different. In this example, on servers ldbd01/02 the database’s name is PRODDB_TULSA. On servers ldbd10/11 the databases’s name is PRODDB_PLANO.

DB_DOMAIN On each Streams database, specify the network domain where the database resides. In this example, the db_domain is DOMAIN.COM. Therefore the source database is named:
PRODDB_TULSA.DOMAIN.COM
GLOBAL_NAME=TRUE On each Streams database specify GLOBAL_NAME=TRUE to ensure the database links work correctly. For example, the source database is named PRODDB_TULSA.DOMAIN.COM when you select * from global_name;
JOB_QUEUE_PROCESSES=4 Specify the JOB_QUEUE_PROCESSES has at least 4 job queues for Streams. If the parameter is already set, increment the parameter by 4.
COMPATIBLE=10.2 Make certain the COMPATIBLE parameter is at least 10.2.

_JOB_QUEUE_INTERVAL=1 On each Streams database, specify how many seconds the job queue is scanned for work. It is recommended that you set this parameter to 1 (seconds) to improve the propagation job performance and minimize the delay for how often propagation jobs will execute. Since this is a hidden parameter, here is an example of how to set it:
alter system set "_job_queue_interval"=1 scope=spfile sid='*';

TIMED_STATISTICS=TRUE On each Streams database, set the TIMED_STATISTICS=TRUE parameter to allow performance statistics to be gathered for analysis of potential performance bottlenecks.

STATISTICS_LEVEL=TYPICAL
or ALL On each Streams database, set this parameter to TYPICAL to collect the necessary statistics. Set to ALL to gather more than the bare minimum.

SHARED_POOL_SIZE=256M On each Streams database, set this parameter to a minimum value of 256 MB because Streams uses a significant amount of PL/SQL and library cache components. Future performance analysis may providerecommendations for how much to increase the size ofSHARED_POOL_SIZE, especially for the database running Streamapply processes.

STREAMS_POOL_SIZE=256M
On each database where there is either a capture or apply process running, set the STREAMS_POOL_SIZE parameter to a minimum of 256M. Memory from the Streams pool is used by both the Streamsprocesses and the buffered queue, with the LCRs in the buffered queuethe largest component. Use the V$STREAMS_POOL_ADVICE view to help you size the buffered queue.


Create Database links between the Source and Target databases

Before creating database links, ensure that all Streams databases are reachable using Oracle Net aliases. Perform the following steps to create database links:
1. Log into the target database as the Streams administrator user and determine the global database name by issuing the following query:
select * from global_name;
You will use the name returned by this query to create the database link in the next step.
2. Create a TNSNAMES.ORA entry for each database using Streams. Below is an example of the TNSNAMES.ORA file entry:
PRODDB_PLANO.DOMAIN.COM =
(DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = hostname0-vip)(PORT = 1521))
(ADDRESS = (PROTOCOL = TCP)(HOST = hostname1-vip)(PORT = 1521))
(LOAD_BALANCE = yes)
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = PRODDB_PLANO.DOMAIN.COM)
(FAILOVER_MODE =
(TYPE = SELECT)
(METHOD = BASIC)
(RETRIES = 180)
(DELAY = 5)
)
)
)

3. Log into the source database as the Streams administrator.
conn strmadmin/strmadmin
4. Create a database link first from the Source database to the Target database. For example, on the source database, enter the following statement to create a database link from the source database to a target database named PRODDB_TULSA.DOMAIN.COM using the TNSNAMES.ORA alias entry PRODDB_TULSA.DOMAIN.COM.
CREATE DATABASE LINK PRODDB_TULSA.DOMAIN.COM CONNECT TO
STRMADMIN IDENTIFIED BY STRMADMIN USING 'PRODDB_TULSA.DOMAIN.COM';

5. Validate the database link is working by issuing the following select * from dual@ query on the source database:
select * from dual@PRODDB_TULSA.DOMAIN.COM;

DUM
---
X

If an error is returned, it is probably due to an incorrect TNSNAMES.ORA descriptor or to an incorrect database name. Do not proceed to the next step until the database link is working properly.

6. Log into the Streams Administrator account on the Destination database.
conn strmadmin/strmadmin@PRODDB_TULSA.DOMAIN.COM
7. Create a database link from the Destination database to the source database.
CREATE DATABASE LINK PRODDB_PLANO.DOMAIN.COM CONNECT TO
STRMADMIN IDENTIFIED BY STRMADMIN USING 'PRODDB_PLANO.DOMAIN.COM';

8. Validate the database link is working by issuing the following select * from dual@ query on the target database:
select * from dual@PRODDB_PLANO.DOMAIN.COM;

9. If an error occurs, it is probably because of an incorrect TNSNAMES.ORA descriptor or an incorrect database name. Do not proceed to the next section until the database link is working properly.

Set up Directory Objects

In many cases, Oracle Streams instantiates the objects for which changes will be captured and uses Data Pump to initially populate the replica tables at the targetdatabase. As a part of this instantiation process, Data Pump needs a file system directory location on both the source and target database server for staging and moving all Streams objects. Streams uses Oracle Directory objects that point to these file system directory locations.
Directory objects are needed by Streams for instantiation purposes. Ensure thatthe locations to which the directory objects point have sufficient storage space tosupport all objects that Streams instantiates. On both the source and downstream databases servers, determine an appropriate location for the file system directory and create Oracle directory objects on each of the databases. For example, as the Streams administrator user, create the Oracle Directory objects as follows:
create directory streams_dir as '/SSM/oracle/admin/{$ORACLE_SID}/dpdump';

Account for object or tablespace name differences when replicating DDLs
If you choose to replicate DDLs, then you need to account for object or tablespace name differences between the source and target databases. Here are some best practices:
• Avoid system-generated naming for constraints or indexes
• Keep the same tablespace names between databases or use a DDL handler to explicitly address the naming differences
Recommendations for Bidirectional Configurations
Downstream capture is typically used to replicate a full database, to offload processing from the source database to the target database, or to reduce data loss with ASYNC or SYNC redo transport.
Specify initialization parameters on the Source and Target databases.
Note the example below. Be sure that the parameters you specify match the locations and parameter values in your Streams configuration.
Source Database Target Database

LOG_ARCHIVE_TARGET_1=
'LOCATION=+DATA/PRODDB_TULSA/
VALID_FOR=(ONLINE_LOGFILES,PRIMARY_ROLE) MANDATORY'
LOG_ARCHIVE_TARGET_2=
'SERVICE=PRODDB_PLANO.DOMAIN.COM
LGWR ASYNC NOREGISTER
VALID_FOR=(ONLINE_LOGFILES,PRIMARY_ROLE)
DB_UNIQUE_NAME=PRODDB_PLANO'
LOG_ARCHIVE_TARGET_STATE_1=ENABLE
LOG_ARCHIVE_TARGET_STATE_2=DEFER
LOG_ARCHIVE_CONFIG='SEND, DG_CONFIG=(PRODDB_TULSA,PRODDB_PLANO)'
LOG_ARCHIVE_MAX_PROCESSES=4
LOG_ARCHIVE_TARGET_1=
'LOCATION=use_db_recovery_file_dest
VALID_FOR=(ONLINE_LOGFILE,PRIMARY_ROLE)'
LOG_ARCHIVE_TARGET_2=
'LOCATION=+DATA/PRODDB_PLANO/ARCH_TULSA_STREAMS
VALID_FOR=(STANDBY_LOGFILE,PRIMARY_ROLE)'
LOG_ARCHIVE_TARGET_STATE_1=ENABLE
LOG_ARCHIVE_TARGET_STATE_2=ENABLE
LOG_ARCHIVE_CONFIG='RECEIVE, DG_CONFIG=(PRODDB_TULSA,PRODDB_PLANO)'
LOG_ARCHIVE_MAX_PROCESSES=4

Configure Standby Redo logs.
To enable real-time mining for capture at the downstream database, you must configure standby redo logs (SRLs) on the downstream database to accept redo from the source database. The Streams capture process mines the standby redo logs and any archived redo logs created from the standby redo logs.
We should have minimum of (No. Of threads)*(groups per Threads + 1). We have total 2 Threads and each has 4 groups. So in this example, we need 5 standby Redo Logs per Thread at Minimum. Get the Max Group# from v$log.

SQL> select thread#, group# from v$log order by 1,2;

THREAD# GROUP#
---------- ----------
1 1
1 2
1 3
1 4
2 5
2 6
2 7
2 8

8 rows selected.

I will start with the Group No. of 9 to add the Standby Redo Logs because the Last Online Redo Group# is 18.

alter system set standby_file_management=manual sid='*';

ALTER DATABASE ADD STANDBY LOGFILE THREAD 1
GROUP 9 SIZE 50M,
GROUP 10 SIZE 50M,
GROUP 11 SIZE 50M,
GROUP 12 SIZE 50M,
GROUP 13 SIZE 50M;

ALTER DATABASE ADD STANDBY LOGFILE THREAD 2
GROUP 14 SIZE 50M,
GROUP 15 SIZE 50M,
GROUP 16 SIZE 50M,
GROUP 17 SIZE 50M,
GROUP 18 SIZE 50M;

SQL> select thread#, group# from v$standby_log order by 1,2;

THREAD# GROUP#
---------- ----------
1 9
1 10
1 11
1 12
1 13
2 14
2 15
2 16
2 17
2 18

10 rows selected.

alter system set standby_file_management=auto sid='*';


Bidirectional Streams Setup

You must have at least one schema (Ex. "SSM") in two different databases Source and Target.

Set up two queues for Capture and Apply in SRC Database as shown below :

connect STRMADMIN/STRMADMIN@PRODDB_PLANO.DOMAIN.COM

begin
dbms_streams_adm.set_up_queue(
queue_table => 'APPLY_PLANOTAB',
queue_name => 'APPLY_PLANO',
queue_user => 'STRMADMIN');
end;
/

begin
dbms_streams_adm.set_up_queue(
queue_table => 'CAPTURE_PLANOTAB',
queue_name => 'CAPTURE_PLANO',
queue_user => 'STRMADMIN');
end;
/

Set up two queues for Capture and Apply in DEST Database as shown below :

connect STRMADMIN/STRMADMIN@PRODDB_TULSA.DOMAIN.COM

begin
dbms_streams_adm.set_up_queue(
queue_table => 'APPLY_TULSATAB',
queue_name => 'APPLY_TULSA',
queue_user => 'STRMADMIN');
end;
/

begin
dbms_streams_adm.set_up_queue(
queue_table => 'CAPTURE_TULSATAB',
queue_name => '&DEST_CAP_QUEUE_NAME',
queue_user => 'STRMADMIN');
end;
/

Configure capture process on SRC database

connect STRMADMIN/STRMADMIN@PRODDB_PLANO.DOMAIN.COM

begin
dbms_streams_adm.add_schema_rules (
schema_name => 'SSM',
streams_type => 'CAPTURE',
streams_name => 'CAPTURES_PLANO',
queue_name => 'CAPTURE_PLANO',
include_dml => TRUE,
include_ddl => TRUE,
inclusion_rule => TRUE);
end;
/

Configure apply process on SRC database

connect STRMADMIN/STRMADMIN@PRODDB_PLANO.DOMAIN.COM

begin
dbms_streams_adm.add_schema_rules (
schema_name => 'SSM',
streams_type => 'APPLY',
streams_name => 'APPLIES_PLANO',
queue_name => 'APPLY_PLANO',
include_dml => TRUE,
include_ddl => TRUE,
source_database => 'PRODDB_TULSA.DOMAIN.COM');
end;
/

Configure propagation process on SRC Database:

connect STRMADMIN/STRMADMIN@PRODDB_PLANO.DOMAIN.COM

begin
dbms_streams_adm.add_schema_propagation_rules (
schema_name => 'SSM',
streams_name => 'PROP_PLANO_TO_TULSA',
source_queue_name => 'CAPTURE_PLANO',
destination_queue_name => 'APPLY_TULSA@PRODDB_TULSA.DOMAIN.COM',
include_dml => TRUE,
include_ddl => TRUE,
source_database => 'PRODDB_PLANO.DOMAIN.COM',
queue_to_queue => TRUE);
end;
/

Configure Schema Supplemental logging on SRC Database:

connect STRMADMIN/STRMADMIN@PRODDB_PLANO.DOMAIN.COM

EXEC DBMS_CAPTURE_ADM.ABORT_SCHEMA_INSTANTIATION('SSM');
EXEC DBMS_CAPTURE_ADM.PREPARE_SCHEMA_INSTANTIATION('SSM','ALL');


Configure capture process on DEST Database:

connect STRMADMIN/STRMADMIN@PRODDB_TULSA.DOMAIN.COM

begin
dbms_streams_adm.add_schema_rules (
schema_name => 'SSM',
streams_type => 'CAPTURE',
streams_name => 'CAPTURES_TULSA',
queue_name => 'CAPTURE_TULSA',
include_dml => TRUE,
include_ddl => TRUE);
end;
/


Set the schema instantiation SCN on SRC using the SCN of Dest database:

connect STRMADMIN/STRMADMIN@PRODDB_TULSA.DOMAIN.COM

SET SERVEROUTPUT ON SIZE 1000000
declare
v_scn number;
begin
v_scn := dbms_flashback.get_system_change_number();
dbms_output.put_line('Instantiation SCN is: ' || v_scn);
dbms_apply_adm.set_schema_instantiation_scn@PRODDB_PLANO.DOMAIN.COM (
source_schema_name => 'SSM',
source_database_name => 'PRODDB_TULSA.DOMAIN.COM ',
instantiation_scn => v_scn,
recursive => TRUE);
end;
/

If you experiences any TNS issues, reload the listener on the Source and retry. If you still are receiving errors, check to see what the SCN number is on each database. You will need to export the schema from PLANO and import it into TULSA and then retry.

Configure apply process on DEST:

connect STRMADMIN/STRMADMIN@PRODDB_TULSA.DOMAIN.COM
begin
dbms_streams_adm.add_schema_rules (
schema_name => 'SSM',
streams_type => 'APPLY',
streams_name => 'APPLIES_TULSA',
queue_name => 'APPLY_TULSA',
include_dml => TRUE,
include_ddl => TRUE,
source_database => 'PRODDB_PLANO.DOMAIN.COM');
end;
/

Configure propagation process on DEST:

connect STRMADMIN/STRMADMIN@PRODDB_TULSA.DOMAIN.COM
begin
dbms_streams_adm.add_schema_propagation_rules (
schema_name => 'SSM',
streams_name => 'PROP_TULSA_TO_PLANO',
source_queue_name => 'CAPTURE_TULSA',
destination_queue_name => 'APPLY_PLANO@PRODDB_PLANO.DOMAIN.COM',
include_dml => TRUE,
include_ddl => TRUE,
source_database => 'PRODDB_TULSA.DOMAIN.COM',
queue_to_queue => TRUE);
end;
/

Configure Schema Supplemental logging on DEST Database:

connect STRMADMIN/STRMADMIN@PRODDB_TULSA.DOMAIN.COM

EXEC DBMS_CAPTURE_ADM.ABORT_SCHEMA_INSTANTIATION('SSM');
EXEC DBMS_CAPTURE_ADM.PREPARE_SCHEMA_INSTANTIATION('SSM','ALL');

At this point, the databases should now be replicating to one another. The next step is in case you need to restart the replication process.
You can confirm the environments are in sync by executing the following SQL on each:
select current_scn from v$database;
If the SCN numbers are identical, the Streams configuration is ready to start the replication.

Set Schema Instantiation on Target Database :

Set schema instantiation on DEST Database using the SCN of SRC database:

connect STRMADMIN/STRMADMIN@PRODDB_PLANO.DOMAIN.COM

SET SERVEROUTPUT ON SIZE 1000000
declare
v_scn number;
begin
v_scn := dbms_flashback.get_system_change_number();
dbms_output.put_line('Instantiation SCN is: ' || v_scn);
dbms_apply_adm.set_schema_instantiation_scn@PRODDB_TULSA.DOMAIN.COM(
source_schema_name => 'SSM',
source_database_name => 'PRODDB_PLANO.DOMAIN.COM',
instantiation_scn => v_scn,
recursive => TRUE);
end;
/
Add additional setting parameters to ensure the best performance. Also, confirm, the Streams environment is correct.

On the Source:
connect STRMADMIN/STRMADMIN@PRODDB_PLANO.DOMAIN.COM

exec dbms_aqadm.ALTER_QUEUE_TABLE ('CAPTURE_PLANOTAB',PRIMARY_INSTANCE =>1,SECONDARY_INSTANCE =>2);
exec dbms_aqadm.ALTER_QUEUE_TABLE ('APPLY_PLANOTAB',PRIMARY_INSTANCE =>1,SECONDARY_INSTANCE =>2);
exec dbms_apply_adm.set_parameter('APPLIES_PLANO','disable_on_error','n');
exec dbms_apply_adm.set_parameter('APPLIES_PLANO', 'PARALLELISM', '6');
exec dbms_apply_adm.set_parameter('APPLIES_PLANO', '_TXN_BUFFER_SIZE', '20');
exec dbms_apply_adm.set_parameter('APPLIES_PLANO', '_HASH_TABLE_SIZE', '10000000');
exec dbms_capture_adm.set_parameter('CAPTURES_PLANO','_CHECKPOINT_FREQUENCY','500');
exec dbms_capture_adm.set_parameter('CAPTURES_PLANO','_SGA_SIZE','100');
exec dbms_capture_adm.set_parameter('CAPTURES_PLANO','_CHECKPOINT_FORCE','Y');
exec dbms_capture_adm.alter_capture(capture_name=>'CAPTURES_PLANO ',CHECKPOINT_RETENTION_TIME=>3);
exec dbms_capture_adm.set_parameter ('CAPTURES_PLANO' ,'WRITE_ALERT_LOG','Y');
exec dbms_apply_adm.set_parameter ('APPLIES_PLANO','WRITE_ALERT_LOG','Y');
set pages 0

select * from dba_capture;
select * from dba_propagation;
select * from dba_apply;
select * from dba_apply_error;

Do the same on the DEST:
connect STRMADMIN/STRMADMIN@PRODDB_TULSA.DOMAIN.COM

exec dbms_aqadm.ALTER_QUEUE_TABLE ('CAPTURE_TULSATAB',PRIMARY_INSTANCE =>1,SECONDARY_INSTANCE =>2);
exec dbms_aqadm.ALTER_QUEUE_TABLE ('APPLY_TULSATAB',PRIMARY_INSTANCE =>1,SECONDARY_INSTANCE =>2);
exec dbms_apply_adm.set_parameter('APPLIES_TULSA','disable_on_error','n');
exec dbms_apply_adm.set_parameter('APPLIES_TULSA', 'PARALLELISM', '6');
exec dbms_apply_adm.set_parameter('APPLIES_TULSA', '_TXN_BUFFER_SIZE', '20');
exec dbms_apply_adm.set_parameter('APPLIES_TULSA', '_HASH_TABLE_SIZE', '10000000');
exec dbms_capture_adm.set_parameter('CAPTURES_TULSA','_CHECKPOINT_FREQUENCY','500');
exec dbms_capture_adm.set_parameter('CAPTURES_TULSA','_SGA_SIZE','100');
exec dbms_capture_adm.set_parameter('CAPTURES_TULSA','_CHECKPOINT_FORCE','Y');
exec dbms_capture_adm.alter_capture(capture_name=>'CAPTURES_TULSA ',CHECKPOINT_RETENTION_TIME=>3);
exec dbms_capture_adm.set_parameter ('CAPTURES_TULSA' ,'WRITE_ALERT_LOG','Y');
exec dbms_apply_adm.set_parameter ('APPLIES_TULSA','WRITE_ALERT_LOG','Y');
set pages 0

select * from dba_capture;
select * from dba_propagation;
select * from dba_apply;
select * from dba_apply_error;

Start the Bidirectional Streams

Start Capture and Apply processes on DEST:

connect STRMADMIN/STRMADMIN@PRODDB_TULSA.DOMAIN.COM

exec dbms_apply_adm.start_apply(apply_name=> 'APPLIES_TULSA');
exec dbms_capture_adm.start_capture(capture_name=>'CAPTURES_TULSA');

Start Capture and Apply processes on SRC:

connect STRMADMIN/STRMADMIN@PRODDB_PLANO.DOMAIN.COM

exec dbms_apply_adm.start_apply(apply_name=> 'APPLIES_PLANO');
exec dbms_capture_adm.start_capture(capture_name=>'CAPTURES_PLANO');

Create the Heart Beat table on Source

connect SSM/SSM@PRODDB_PLANO.DOMAIN.COM

drop table SSM.SRC_DEST_HEART_BEAT;
create table SSM.SRC_DEST_HEART_BEAT(SRC_DATE timestamp);
insert into SSM.SRC_DEST_HEART_BEAT(select sysdate from dual);
grant insert,update,delete,select on SRC_DEST_HEART_BEAT to STRMADMIN;

conn STRMADMIN/STRMADMIN@PRODDB_PLANO.DOMAIN.COM

variable jobno number
begin
dbms_job.submit(:jobno,'update ssm.src_dest_heart_beat set src_date =(select sysdate from dual);',sysdate,'sysdate+300/(60*60*24)');
commit;
end;
/
create or replace procedure strm_build_proc
is
scn NUMBER;
BEGIN
DBMS_CAPTURE_ADM.BUILD(
first_scn => scn);
DBMS_OUTPUT.PUT_LINE('First SCN Value = ' || scn);
END;
/

variable jobno number
begin
dbms_job.submit(:jobno,'strm_build_proc;',sysdate,'sysdate+ 2/24');
commit;
end;
/

Repeat Job Creation for Heart Beat on the Target
connect STRMADMIN/STRMADMIN@PRODDB_TULSA.DOMAIN.COM

variable jobno number
begin
dbms_job.submit(:jobno,'update ssm.dest_src_heart_beat set dest_date =(select sysdate from dual);',sysdate,'sysdate+300/(60*60*24)');
commit;
end;
/
create or replace procedure strm_build_proc
is
scn NUMBER;
BEGIN
DBMS_CAPTURE_ADM.BUILD(
first_scn => scn);
DBMS_OUTPUT.PUT_LINE('First SCN Value = ' || scn);
END;
/

variable jobno number
begin
dbms_job.submit(:jobno,'strm_build_proc;',sysdate,'sysdate+ 2/24');
commit;
end;
/

Create Conflict Procedure on Source Developed for Kiosk_Profile (overwrite)
connect STRMADMIN/STRMADMIN@PRODDB_PLANO.DOMAIN.COM

-------CONFLICT RESOLUTION SCRIPT----------


DECLARE
kp_fields DBMS_UTILITY.NAME_ARRAY;
BEGIN
kp_fields(1) := 'profile_id';
kp_fields(2) := 'device_id';
kp_fields(3) := 'device_type';
kp_fields(4) := 'serial_number';
kp_fields(5) := 'airport';
kp_fields(6) := 'area';
kp_fields(7) := 'facility';
kp_fields(8) := 'location';
kp_fields(9) := 'use_actual_departure_time';
kp_fields(10) := 'late_checkin_time';
kp_fields(11) := 'early_checkin_time';
kp_fields(12) := 'baggage_late_checkin_time';
kp_fields(13) := 'nrps_early_checkin_time';
kp_fields(14) := 'nrsa_early_checkin_time';
kp_fields(15) := 'asset_id';
kp_fields(16) := 'operation_hour_start';
kp_fields(17) := 'operation_hour_end';
kp_fields(18) := 'sofware_version';
kp_fields(19) := 'bag_tag_printer_capable';
kp_fields(20) := 'after_midnight_cutoff';
kp_fields(21) := 'primary_km';
kp_fields(22) := 'backup_km';
kp_fields(23) := 'ticket_number';
kp_fields(24) := 'ticket_date';
kp_fields(25) := 'domestic_boarding_time';
kp_fields(26) := 'default_lang';
kp_fields(27) := 'environment';
kp_fields(28) := 'check_bag_only_capable';
kp_fields(29) := 'ticket_stock';
kp_fields(30) := 'ticket_stock_id';
kp_fields(31) := 'baggage_early_checkin_time';
kp_fields(32) := 'baggage_after_midnight_cutoff';
kp_fields(33) := 'international_flag';
kp_fields(34) := 'intl_late_checkin_time';
kp_fields(35) := 'virtual_bag_check';
kp_fields(36) := 'current_tkt_printer';
kp_fields(37) := 'previous_tkt_printer';
kp_fields(38) := 'barcode_2d_capable';
kp_fields(39) := 'intl_baggage_late_checkin_time';
kp_fields(40) := 'dmst_late_checkin_time';
kp_fields(41) := 'dmst_baggage_late_checkin_time';
kp_fields(42) := 'agent_takeover_allowed';
kp_fields(43) := 'client_restart_freq';
kp_fields(44) := 'use_oper_hour_end_for_restart';
kp_fields(45) := 'client_restart_time';

DBMS_APPLY_ADM.SET_UPDATE_CONFLICT_HANDLER(
object_name => 'ssm.kiosk_profile',
method_name => 'overwrite',
resolution_column => 'profile_id',
column_list => kp_fields);
END;
/

Create Conflict Procedure on Destination for Kiosk_Profile (discard)

connect STRMADMIN/STRMADMIN@PRODDB_TULSA.DOMAIN.COM

------CONFLICT RESOLUTION SCRIPT--------

DECLARE
kp_fields DBMS_UTILITY.NAME_ARRAY;
BEGIN
kp_fields(1) := 'profile_id';
kp_fields(2) := 'device_id';
kp_fields(3) := 'device_type';
kp_fields(4) := 'serial_number';
kp_fields(5) := 'airport';
kp_fields(6) := 'area';
kp_fields(7) := 'facility';
kp_fields(8) := 'location';
kp_fields(9) := 'use_actual_departure_time';
kp_fields(10) := 'late_checkin_time';
kp_fields(11) := 'early_checkin_time';
kp_fields(12) := 'baggage_late_checkin_time';
kp_fields(13) := 'nrps_early_checkin_time';
kp_fields(14) := 'nrsa_early_checkin_time';
kp_fields(15) := 'asset_id';
kp_fields(16) := 'operation_hour_start';
kp_fields(17) := 'operation_hour_end';
kp_fields(18) := 'sofware_version';
kp_fields(19) := 'bag_tag_printer_capable';
kp_fields(20) := 'after_midnight_cutoff';
kp_fields(21) := 'primary_km';
kp_fields(22) := 'backup_km';
kp_fields(23) := 'ticket_number';
kp_fields(24) := 'ticket_date';
kp_fields(25) := 'domestic_boarding_time';
kp_fields(26) := 'default_lang';
kp_fields(27) := 'environment';
kp_fields(28) := 'check_bag_only_capable';
kp_fields(29) := 'ticket_stock';
kp_fields(30) := 'ticket_stock_id';
kp_fields(31) := 'baggage_early_checkin_time';
kp_fields(32) := 'baggage_after_midnight_cutoff';
kp_fields(33) := 'international_flag';
kp_fields(34) := 'intl_late_checkin_time';
kp_fields(35) := 'virtual_bag_check';
kp_fields(36) := 'current_tkt_printer';
kp_fields(37) := 'previous_tkt_printer';
kp_fields(38) := 'barcode_2d_capable';
kp_fields(39) := 'intl_baggage_late_checkin_time';
kp_fields(40) := 'dmst_late_checkin_time';
kp_fields(41) := 'dmst_baggage_late_checkin_time';
kp_fields(42) := 'agent_takeover_allowed';
kp_fields(43) := 'client_restart_freq';
kp_fields(44) := 'use_oper_hour_end_for_restart';
kp_fields(45) := 'client_restart_time';

DBMS_APPLY_ADM.SET_UPDATE_CONFLICT_HANDLER(
object_name => 'ssm.kiosk_profile',
method_name => 'discard',
resolution_column => 'profile_id',
column_list => kp_fields);
END;
/

Create Procedure to print out transactions that show up in dba_apply_errors
connect STRMADMIN/STRMADMIN@PRODDB_PLANO.DOMAIN.COM

CREATE OR REPLACE PROCEDURE print_any(data IN SYS.AnyData) IS
tn VARCHAR2(61);
str VARCHAR2(4000);
chr CHAR(255);
num NUMBER;
dat DATE;
rw RAW(4000);
res NUMBER;
BEGIN
IF data IS NULL THEN
DBMS_OUTPUT.PUT_LINE('NULL value');
RETURN;
END IF;
tn := data.GETTYPENAME();
IF tn = 'SYS.VARCHAR2' THEN
res := data.GETVARCHAR2(str);
DBMS_OUTPUT.PUT_LINE(str);
ELSIF tn = 'SYS.CHAR' then
res := data.GETCHAR(chr);
DBMS_OUTPUT.PUT_LINE(chr);
ELSIF tn = 'SYS.VARCHAR' THEN
res := data.GETVARCHAR(chr);
DBMS_OUTPUT.PUT_LINE(chr);
ELSIF tn = 'SYS.NUMBER' THEN
res := data.GETNUMBER(num);
DBMS_OUTPUT.PUT_LINE(num);
ELSIF tn = 'SYS.DATE' THEN
res := data.GETDATE(dat);
DBMS_OUTPUT.PUT_LINE(dat);
ELSIF tn = 'SYS.RAW' THEN
res := data.GETRAW(rw);
DBMS_OUTPUT.PUT_LINE(RAWTOHEX(rw));
ELSE
DBMS_OUTPUT.PUT_LINE('typename is ' || tn);
END IF;
END print_any;
/

CREATE OR REPLACE PROCEDURE print_lcr(lcr IN SYS.ANYDATA) IS
typenm VARCHAR2(61);
ddllcr SYS.LCR$_DDL_RECORD;
proclcr SYS.LCR$_PROCEDURE_RECORD;
rowlcr SYS.LCR$_ROW_RECORD;
res NUMBER;
newlist SYS.LCR$_ROW_LIST;
oldlist SYS.LCR$_ROW_LIST;
ddl_text CLOB;
BEGIN
typenm := lcr.GETTYPENAME();
DBMS_OUTPUT.PUT_LINE('type name: ' || typenm);
IF (typenm = 'SYS.LCR$_DDL_RECORD') THEN
res := lcr.GETOBJECT(ddllcr);
DBMS_OUTPUT.PUT_LINE('source database: ' ||
ddllcr.GET_SOURCE_DATABASE_NAME);
DBMS_OUTPUT.PUT_LINE('owner: ' || ddllcr.GET_OBJECT_OWNER);
DBMS_OUTPUT.PUT_LINE('object: ' || ddllcr.GET_OBJECT_NAME);
DBMS_OUTPUT.PUT_LINE('is tag null: ' || ddllcr.IS_NULL_TAG);
DBMS_LOB.CREATETEMPORARY(ddl_text, TRUE);
ddllcr.GET_DDL_TEXT(ddl_text);
DBMS_OUTPUT.PUT_LINE('ddl: ' || ddl_text);
DBMS_LOB.FREETEMPORARY(ddl_text);
ELSIF (typenm = 'SYS.LCR$_ROW_RECORD') THEN
res := lcr.GETOBJECT(rowlcr);
DBMS_OUTPUT.PUT_LINE('source database: ' ||
rowlcr.GET_SOURCE_DATABASE_NAME);
DBMS_OUTPUT.PUT_LINE('owner: ' || rowlcr.GET_OBJECT_OWNER);
DBMS_OUTPUT.PUT_LINE('object: ' || rowlcr.GET_OBJECT_NAME);
DBMS_OUTPUT.PUT_LINE('is tag null: ' || rowlcr.IS_NULL_TAG);
DBMS_OUTPUT.PUT_LINE('command_type: ' || rowlcr.GET_COMMAND_TYPE);
oldlist := rowlcr.GET_VALUES('OLD');
FOR i IN 1..oldlist.COUNT LOOP
if oldlist(i) is not null then
DBMS_OUTPUT.PUT_LINE('old(' || i || '): ' || oldlist(i).column_name);
print_any(oldlist(i).data);
END IF;
END LOOP;
newlist := rowlcr.GET_VALUES('NEW');
FOR i in 1..newlist.count LOOP
IF newlist(i) IS NOT NULL THEN
DBMS_OUTPUT.PUT_LINE('new(' || i || '): ' || newlist(i).column_name);
print_any(newlist(i).data);
END IF;
END LOOP;
ELSE
DBMS_OUTPUT.PUT_LINE('Non-LCR Message with type ' || typenm);
END IF;
END print_lcr;
/

CREATE OR REPLACE PROCEDURE print_transaction(ltxnid IN VARCHAR2) IS
i NUMBER;
txnid VARCHAR2(30);
source VARCHAR2(128);
msgcnt NUMBER;
errno NUMBER;
errmsg VARCHAR2(128);
lcr SYS.ANYDATA;
BEGIN
SELECT LOCAL_TRANSACTION_ID,
SOURCE_DATABASE,
MESSAGE_COUNT,
ERROR_NUMBER,
ERROR_MESSAGE
INTO txnid, source, msgcnt, errno, errmsg
FROM DBA_APPLY_ERROR
WHERE LOCAL_TRANSACTION_ID = ltxnid;
DBMS_OUTPUT.PUT_LINE('----- Local Transaction ID: ' || txnid);
DBMS_OUTPUT.PUT_LINE('----- Source Database: ' || source);
DBMS_OUTPUT.PUT_LINE('----Error Number: '||errno);
DBMS_OUTPUT.PUT_LINE('----Message Text: '||errmsg);
FOR i IN 1..msgcnt LOOP
DBMS_OUTPUT.PUT_LINE('--message: ' || i);
lcr := DBMS_APPLY_ADM.GET_ERROR_MESSAGE(i, txnid); -- gets the LCR
print_lcr(lcr);
END LOOP;
END print_transaction;
/

CREATE OR REPLACE PROCEDURE print_errors IS
CURSOR c IS
SELECT LOCAL_TRANSACTION_ID,
SOURCE_DATABASE,
MESSAGE_COUNT,
ERROR_NUMBER,
ERROR_MESSAGE
FROM DBA_APPLY_ERROR
ORDER BY SOURCE_DATABASE, SOURCE_COMMIT_SCN;
i NUMBER;
txnid VARCHAR2(30);
source VARCHAR2(128);
msgcnt NUMBER;
errnum NUMBER := 0;
errno NUMBER;
errmsg VARCHAR2(128);
lcr SYS.AnyData;
r NUMBER;
BEGIN
FOR r IN c LOOP
errnum := errnum + 1;
msgcnt := r.MESSAGE_COUNT;
txnid := r.LOCAL_TRANSACTION_ID;
source := r.SOURCE_DATABASE;
errmsg := r.ERROR_MESSAGE;
errno := r.ERROR_NUMBER;
DBMS_OUTPUT.PUT_LINE('*************************************************');
DBMS_OUTPUT.PUT_LINE('----- ERROR #' || errnum);
DBMS_OUTPUT.PUT_LINE('----- Local Transaction ID: ' || txnid);
DBMS_OUTPUT.PUT_LINE('----- Source Database: ' || source);
DBMS_OUTPUT.PUT_LINE('----Error Number: '||errno);
DBMS_OUTPUT.PUT_LINE('----Message Text: '||errmsg);
FOR i IN 1..msgcnt LOOP
DBMS_OUTPUT.PUT_LINE('--message: ' || i);
lcr := DBMS_APPLY_ADM.GET_ERROR_MESSAGE(i, txnid);
print_lcr(lcr);
END LOOP;
END LOOP;
END print_errors;
/

connect STRMADMIN/STRMADMIN@PRODDB_TULSA.DOMAIN.COM

CREATE OR REPLACE PROCEDURE print_any(data IN SYS.AnyData) IS
tn VARCHAR2(61);
str VARCHAR2(4000);
chr CHAR(255);
num NUMBER;
dat DATE;
rw RAW(4000);
res NUMBER;
BEGIN
IF data IS NULL THEN
DBMS_OUTPUT.PUT_LINE('NULL value');
RETURN;
END IF;
tn := data.GETTYPENAME();
IF tn = 'SYS.VARCHAR2' THEN
res := data.GETVARCHAR2(str);
DBMS_OUTPUT.PUT_LINE(str);
ELSIF tn = 'SYS.CHAR' then
res := data.GETCHAR(chr);
DBMS_OUTPUT.PUT_LINE(chr);
ELSIF tn = 'SYS.VARCHAR' THEN
res := data.GETVARCHAR(chr);
DBMS_OUTPUT.PUT_LINE(chr);
ELSIF tn = 'SYS.NUMBER' THEN
res := data.GETNUMBER(num);
DBMS_OUTPUT.PUT_LINE(num);
ELSIF tn = 'SYS.DATE' THEN
res := data.GETDATE(dat);
DBMS_OUTPUT.PUT_LINE(dat);
ELSIF tn = 'SYS.RAW' THEN
res := data.GETRAW(rw);
DBMS_OUTPUT.PUT_LINE(RAWTOHEX(rw));
ELSE
DBMS_OUTPUT.PUT_LINE('typename is ' || tn);
END IF;
END print_any;
/

CREATE OR REPLACE PROCEDURE print_lcr(lcr IN SYS.ANYDATA) IS
typenm VARCHAR2(61);
ddllcr SYS.LCR$_DDL_RECORD;
proclcr SYS.LCR$_PROCEDURE_RECORD;
rowlcr SYS.LCR$_ROW_RECORD;
res NUMBER;
newlist SYS.LCR$_ROW_LIST;
oldlist SYS.LCR$_ROW_LIST;
ddl_text CLOB;
BEGIN
typenm := lcr.GETTYPENAME();
DBMS_OUTPUT.PUT_LINE('type name: ' || typenm);
IF (typenm = 'SYS.LCR$_DDL_RECORD') THEN
res := lcr.GETOBJECT(ddllcr);
DBMS_OUTPUT.PUT_LINE('source database: ' ||
ddllcr.GET_SOURCE_DATABASE_NAME);
DBMS_OUTPUT.PUT_LINE('owner: ' || ddllcr.GET_OBJECT_OWNER);
DBMS_OUTPUT.PUT_LINE('object: ' || ddllcr.GET_OBJECT_NAME);
DBMS_OUTPUT.PUT_LINE('is tag null: ' || ddllcr.IS_NULL_TAG);
DBMS_LOB.CREATETEMPORARY(ddl_text, TRUE);
ddllcr.GET_DDL_TEXT(ddl_text);
DBMS_OUTPUT.PUT_LINE('ddl: ' || ddl_text);
DBMS_LOB.FREETEMPORARY(ddl_text);
ELSIF (typenm = 'SYS.LCR$_ROW_RECORD') THEN
res := lcr.GETOBJECT(rowlcr);
DBMS_OUTPUT.PUT_LINE('source database: ' ||
rowlcr.GET_SOURCE_DATABASE_NAME);
DBMS_OUTPUT.PUT_LINE('owner: ' || rowlcr.GET_OBJECT_OWNER);
DBMS_OUTPUT.PUT_LINE('object: ' || rowlcr.GET_OBJECT_NAME);
DBMS_OUTPUT.PUT_LINE('is tag null: ' || rowlcr.IS_NULL_TAG);
DBMS_OUTPUT.PUT_LINE('command_type: ' || rowlcr.GET_COMMAND_TYPE);
oldlist := rowlcr.GET_VALUES('OLD');
FOR i IN 1..oldlist.COUNT LOOP
if oldlist(i) is not null then
DBMS_OUTPUT.PUT_LINE('old(' || i || '): ' || oldlist(i).column_name);
print_any(oldlist(i).data);
END IF;
END LOOP;
newlist := rowlcr.GET_VALUES('NEW');
FOR i in 1..newlist.count LOOP
IF newlist(i) IS NOT NULL THEN
DBMS_OUTPUT.PUT_LINE('new(' || i || '): ' || newlist(i).column_name);
print_any(newlist(i).data);
END IF;
END LOOP;
ELSE
DBMS_OUTPUT.PUT_LINE('Non-LCR Message with type ' || typenm);
END IF;
END print_lcr;
/

CREATE OR REPLACE PROCEDURE print_transaction(ltxnid IN VARCHAR2) IS
i NUMBER;
txnid VARCHAR2(30);
source VARCHAR2(128);
msgcnt NUMBER;
errno NUMBER;
errmsg VARCHAR2(128);
lcr SYS.ANYDATA;
BEGIN
SELECT LOCAL_TRANSACTION_ID,
SOURCE_DATABASE,
MESSAGE_COUNT,
ERROR_NUMBER,
ERROR_MESSAGE
INTO txnid, source, msgcnt, errno, errmsg
FROM DBA_APPLY_ERROR
WHERE LOCAL_TRANSACTION_ID = ltxnid;
DBMS_OUTPUT.PUT_LINE('----- Local Transaction ID: ' || txnid);
DBMS_OUTPUT.PUT_LINE('----- Source Database: ' || source);
DBMS_OUTPUT.PUT_LINE('----Error Number: '||errno);
DBMS_OUTPUT.PUT_LINE('----Message Text: '||errmsg);
FOR i IN 1..msgcnt LOOP
DBMS_OUTPUT.PUT_LINE('--message: ' || i);
lcr := DBMS_APPLY_ADM.GET_ERROR_MESSAGE(i, txnid); -- gets the LCR
print_lcr(lcr);
END LOOP;
END print_transaction;
/

CREATE OR REPLACE PROCEDURE print_errors IS
CURSOR c IS
SELECT LOCAL_TRANSACTION_ID,
SOURCE_DATABASE,
MESSAGE_COUNT,
ERROR_NUMBER,
ERROR_MESSAGE
FROM DBA_APPLY_ERROR
ORDER BY SOURCE_DATABASE, SOURCE_COMMIT_SCN;
i NUMBER;
txnid VARCHAR2(30);
source VARCHAR2(128);
msgcnt NUMBER;
errnum NUMBER := 0;
errno NUMBER;
errmsg VARCHAR2(128);
lcr SYS.AnyData;
r NUMBER;
BEGIN
FOR r IN c LOOP
errnum := errnum + 1;
msgcnt := r.MESSAGE_COUNT;
txnid := r.LOCAL_TRANSACTION_ID;
source := r.SOURCE_DATABASE;
errmsg := r.ERROR_MESSAGE;
errno := r.ERROR_NUMBER;
DBMS_OUTPUT.PUT_LINE('*************************************************');
DBMS_OUTPUT.PUT_LINE('----- ERROR #' || errnum);
DBMS_OUTPUT.PUT_LINE('----- Local Transaction ID: ' || txnid);
DBMS_OUTPUT.PUT_LINE('----- Source Database: ' || source);
DBMS_OUTPUT.PUT_LINE('----Error Number: '||errno);
DBMS_OUTPUT.PUT_LINE('----Message Text: '||errmsg);
FOR i IN 1..msgcnt LOOP
DBMS_OUTPUT.PUT_LINE('--message: ' || i);
lcr := DBMS_APPLY_ADM.GET_ERROR_MESSAGE(i, txnid);
print_lcr(lcr);
END LOOP;
END LOOP;
END print_errors;
/


Hint and Tricks: Commands that are Used to look into Streams

Capture Side

alter session set nls_date_format='dd-mon-yyyy hh24:mi:ss';
select START_SCN ,FIRST_SCN , CAPTURED_SCN,APPLIED_SCN,status from dba_capture;

--To check is the messages are spilling
select CNUM_MSGS,NUM_MSGS,SPILL_MSGS,CSPILL_MSGS from v$buffered_queues;

--- to check if there are any open transactions which are not commited
select * from v$streams_transaction;

The following query confirm if the required logfile is registered

select a.name, a.status from v$archived_log a, v$streams_capture s
where s.CAPTURE_MESSAGE_NUMBER
between a.FIRST_CHANGE# and a.NEXT_CHANGE#;

col CONSUMER_NAME format a20
col SOURCE_DATABASE format a20
col MODIFIEDS_TIME format a20

SELECT r.CONSUMER_NAME,
r.SOURCE_DATABASE,
r.SEQUENCE#,
r.NAME,
r.first_scn,
r.FIRST_TIME,
r.MODIFIED_TIME,
r.DICTIONARY_BEGIN,
r.DICTIONARY_END
FROM DBA_REGISTERED_ARCHIVED_LOG r, DBA_CAPTURE c
WHERE r.CONSUMER_NAME = c.CAPTURE_NAME
ORDER BY source_database, consumer_name, r.first_scn;

Propagation

select propagation_name,status from dba_propagation;
SELECT p.propagation_name, p.SOURCE_QUEUE_OWNER ||'.'||
p.SOURCE_QUEUE_NAME ||'@'||
g.GLOBAL_NAME "Source Queue",
p.DESTINATION_QUEUE_OWNER ||'.'||
p.DESTINATION_QUEUE_NAME ||'@'||
p.DESTINATION_DBLINK "Destination Queue",
p.queue_to_queue,
p.status, p.error_date, p.error_message
FROM DBA_PROPAGATION p, GLOBAL_NAME g;


Apply Side

alter session set nls_date_format='dd-mon-yyyy hh24:mi:ss';

Exec DBMS_APPLY_ADM.STOP_APPLY('APPLIES_TULSA');
EXEC DBMS_APPLY_ADM.START_APPLY('APPLIES_TULSA');

select apply_name, apply_captured, status from dba_apply;

-- to check if the messages are being dequeued

SELECT APPLY_NAME, STATE, TOTAL_MESSAGES_DEQUEUED, to_char (DEQUEUE_TIME ,'dd-mon-yyyy hh24:mi:ss') FROM V$STREAMS_APPLY_READER;

select CNUM_MSGS,NUM_MSGS,SPILL_MSGS,CSPILL_MSGS from v$buffered_queues;

select * from dba_apply_progress;

COLUMN PROCESS_NAME HEADING "Coaordinator|Process|Name" FORMAT A11
COLUMN SID HEADING 'Session|ID' FORMAT 9999
COLUMN SERIAL# HEADING 'Session|Serial|Number' FORMAT 9999
COLUMN STATE HEADING 'State' FORMAT A21
COLUMN TOTAL_RECEIVED HEADING 'Total|Trans|Received' FORMAT 999999
COLUMN TOTAL_APPLIED HEADING 'Total|Trans|Applied' FORMAT 999999
COLUMN TOTAL_ERRORS HEADING 'Total|Apply|Errors' FORMAT 9999

select SUBSTR(s.PROGRAM,INSTR(S.PROGRAM,'(')+1,4) PROCESS_NAME,
c.SID,
c.SERIAL#,
c.STATE,
c.TOTAL_RECEIVED,
c.TOTAL_APPLIED,
c.TOTAL_ERRORS
FROM V$STREAMS_APPLY_COORDINATOR c, V$SESSION s
WHERE c.APPLY_NAME = 'APPLY_EQARTOU3' AND
c.SID = s.SID AND
c.SERIAL# = s.SERIAL#
/

Is an Apply Server Performing Poorly for Certain Transactions?
If an apply process is not performing well, then the reason might be that one or more apply servers used by the apply process are taking an inordinate amount of time to apply certain transactions. The following query displays information about the transactions being applied by each apply server used by an apply process named "applies_tulsa":
COLUMN SERVER_ID HEADING 'Apply Server ID' FORMAT 99999999
COLUMN STATE HEADING 'Apply Server State' FORMAT A20
COLUMN APPLIED_MESSAGE_NUMBER HEADING 'Applied Message|Number' FORMAT
99999999
COLUMN MESSAGE_SEQUENCE HEADING 'Message Sequence|Number' FORMAT 99999999

SELECT SERVER_ID, STATE, APPLIED_MESSAGE_NUMBER, MESSAGE_SEQUENCE
FROM V$STREAMS_APPLY_SERVER
WHERE APPLY_NAME = 'APPLIES_TULSA'
ORDER BY SERVER_ID;

If you run this query repeatedly, then over time the apply server state, applied message number, and message sequence number should continue to change for each apply server as it applies transactions. If these values do not change for one or more apply servers, then the apply server might not be performing well. In this case, you should make sure that, for each table to which the apply process applies changes, every key column has an index.

If you have many such tables, then you might need to determine the specific table and DML or DDL operation that is causing an apply server to perform poorly. To do so, run the following query when an apply server is taking an inordinately long time to apply a transaction. In this example, assume that the name of the apply process is "applies_tulsa" and that apply server number two is performing poorly:
COLUMN OPERATION HEADING 'Operation' FORMAT A20
COLUMN OPTIONS HEADING 'Options' FORMAT A20
COLUMN OBJECT_OWNER HEADING 'Object|Owner' FORMAT A10
COLUMN OBJECT_NAME HEADING 'Object|Name' FORMAT A10
COLUMN COST HEADING 'Cost' FORMAT 99999999

SELECT p.OPERATION, p.OPTIONS, p.OBJECT_OWNER, p.OBJECT_NAME, p.COST
FROM V$SQL_PLAN p, V$SESSION s, V$STREAMS_APPLY_SERVER a
WHERE a.APPLY_NAME = 'APPLIES_TULSA' AND a.SERVER_ID = 2
AND s.SID = a.SID
AND p.HASH_VALUE = s.SQL_HASH_VALUE;

This query returns the operation being performed currently by the specified apply server. The query also returns the owner and name of the table on which the operation is being performed and the cost of the operation. Make sure each key column in this table has an index. If the
results show FULL for the COST column, then the operation is causing full table scans, and indexing the table's key columns might solve the problem.

In addition, you can run the following query to determine the specific DML or DDL SQL statement that is causing an apply server to perform poorly, assuming that the name of the apply process is "applies_tulsa" and that apply server number two is performing poorly:

SELECT t.SQL_TEXT
FROM V$SESSION s, V$SQLTEXT t, V$STREAMS_APPLY_SERVER a
WHERE a.APPLY_NAME = 'APPLIES_TULSA' AND a.SERVER_ID = 2
AND s.SID = a.SID
AND s.SQL_ADDRESS = t.ADDRESS
AND s.SQL_HASH_VALUE = t.HASH_VALUE
ORDER BY PIECE;

This query returns the SQL statement being run currently by the specified apply server. The statement includes the name of the table to which the transaction is being applied. Make sure each key column in this table has an index.

If the SQL statement returned by the previous query is less than one thousand characters long, then you can run the following simplified query instead:

SELECT t.SQL_TEXT
FROM V$SESSION s, V$SQLAREA t, V$STREAMS_APPLY_SERVER a
WHERE a.APPLY_NAME = 'APPLIES_TULSA' AND a.SERVER_ID = 2
AND s.SID = a.SID
AND s.SQL_ADDRESS = t.ADDRESS
AND s.SQL_HASH_VALUE = t.HASH_VALUE;


To delete all errors and Specific Transaction in dba_apply_error

exec dbms_apply_adm.delete_all_errors('&APPLY_NAME');

For a specific transaction

exec dbms_apply_adm.delete_error('&LOCAL_TRANSACTION_ID');


To check what SQL is in long running transaction, start log mining

alter session set nls_date_format='dd-mon-yyyy hh24:mi:ss';

select name,sequence# from v$archived_log where &commitscn (obtained from health check) between first_change# and next_change#;

From the output of name column from above command give the logfile to the below command:

exec dbms_logmnr.add_logfile('&1');
dbms_logmnr.start_logmnr(options=>dbms_logmnr.DICT_FROM_ONLINE_CATALOG);


select XIDUSN || '.' || XIDSLT || '.' || XIDSQN from v$streams_transaction ;

select timestamp,commit_timestamp,SQL_REDO from v$logmnr_contents where XIDUSN || '.' || XIDSLT || '.' || XIDSQN = '4.1.93557';

exec dbms_logmnr.end_logmnr();


To printout the transactions that has been errored out in dba_apply_error

select LOCAL_TRANSACTION_ID from dba_apply_error;

SET SERVEROUTPUT ON SIZE 1000000
EXEC print_transaction('&LOCAL_TRANSACTION_ID');



To Skip the current Transaction

On the Capture side

exec dbms_capture_am.set_parameter('&capture_name', '_ignore_transaction', '&txn_id');

Txn_id obtained from below command

select XIDUSN || '.' || XIDSLT || '.' || XIDSQN from v$streams_transaction ;

exec dbms_capture.stop_capture ('&capture_name');
exec dbms_capture.start_capture ('&capture_name');


On the Apply side

exec dbms_apply_am.set_parameter('&apply_name', '_ignore_transaction', '&txn_id' );
exec dbms_apply.stop_apply ('&apply_name');
exec dbms_apply.start_ apply ('&apply_name');


Periodic Maintenance

Checkpoint periodically

Periodically force capture to checkpoint. This checkpoint is not the same as a database checkpoint. To force capture to checkpoint, use the capture parameter _CHECKPOINT_FORCE and set the value to YES.

Forcing a checkpoint ensure that the DBA_CAPTURE view columns CAPTURED_SCN and APPLIED_SCN are maintained.


Perform periodic maintenance

1. Move FIRST_SCN. Alter the capture FIRST_SCN, the lowest possible SCN available for capturing changes, so that Streams metadata tables can be purged and space in SYSAUX tablespace reclaimed.

2. Dump fresh copy of Dictionary to redo. Issue a DBMS_CAPTURE_ADM.BUILD command to dump a current copy of the data dictionary to the redo logs. Doing this will reduce the amount of logs to be processed in case of additional capture process creation or process rebuild.

3. Prepare database objects for instantiation Issue a DBMS_CAPTURE_ADM.PREPARE_GLOBAL_INSTANTIATION. This is used in conjunction with the BUILD in B above for new capture creation or rebuild purposes.

Additional Recommendations

1. Please make sure there are no network issues. If so install OSWATCHER from Metalink Note.301137.1 and upload it to Oracle Support for further analysis.
2. Check the batch jobs that have commit point greater than 1000 .If so please reduce the commit point less than 1000.
3. Do more testing with the environment.

No comments:

Post a Comment