среда, 22 марта 2017 г.

Use parallel execution


-- Stored procedure to be run from the job: Uses pipes for job synchronization, executes PROC_DELETE_TEST_BONUS.
CREATE OR REPLACE PROCEDURE PROC_DELETE_TEST_BONUS_CONCUR(in_pipe_name IN VARCHAR2,
    in_job IN VARCHAR2)
IS
 flag INTEGER;
BEGIN
  -- Execute actual procedure
  ibm_odm_test.init;
--  logevent('dgv_pll', 'z');

 -- Signal completion
 -- Use the procedure to put a message in the local buffer. 
  DBMS_PIPE.PACK_MESSAGE(SYSDATE ||': Success ' ||in_job);
  -- Send message, success is a zero return value.
  flag := DBMS_PIPE.SEND_MESSAGE(in_pipe_name);
EXCEPTION
WHEN OTHERS THEN
  -- Signal completion
  -- Use the procedure to put a message in the local buffer. 
  DBMS_PIPE.PACK_MESSAGE(SYSDATE ||':Failed ' || in_job);
  -- Send message, success is a zero return value.
  flag := DBMS_PIPE.SEND_MESSAGE(in_pipe_name);
  RAISE;
END;
/


--
-- Run Jobs
--
DECLARE
  timestart NUMBER;
  duration_insert NUMBER;
  jobs_amount NUMBER := 0;
  retval INTEGER;
  message     VARCHAR2(4000);
  rows_amount NUMBER;

/** Create and define a program that calls PROG_DELETE_TEST_BONUS_CONCUR to be run as job. */
PROCEDURE create_prog_delete_test_bonus
IS  
BEGIN
  -- define new in each run in order to ease development. TODO Once it works, no need to redefine for each run!
  dbms_scheduler.drop_program(program_name => 'PROG_DELETE_TEST_BONUS_CONCUR', force=> TRUE);

  dbms_scheduler.create_program ( program_name => 'PROG_DELETE_TEST_BONUS_CONCUR', program_action =>
  'PROC_DELETE_TEST_BONUS_CONCUR', program_type => 'STORED_PROCEDURE', number_of_arguments => 2,
  enabled => FALSE );

  dbms_scheduler.DEFINE_PROGRAM_ARGUMENT( program_name => 'PROG_DELETE_TEST_BONUS_CONCUR',
  argument_position => 1, argument_name => 'in_pipe_name', argument_type => 'VARCHAR2');
  dbms_scheduler.DEFINE_PROGRAM_ARGUMENT( program_name=>'PROG_DELETE_TEST_BONUS_CONCUR',
  argument_position => 2, argument_name => 'in_job', argument_type => 'VARCHAR2');

  dbms_scheduler.enable('PROG_DELETE_TEST_BONUS_CONCUR');
END;

/** "Forks" a job that runs PROG_DELETE_TEST_BONUS_CONCUR */
PROCEDURE RUN_TEST_BONUS_JOB(
    in_pipe_name IN VARCHAR2,
    in_job    IN VARCHAR2,
    io_job_amount IN OUT NUMBER)
IS
  jobname VARCHAR2(100);
BEGIN
  jobname:=DBMS_SCHEDULER.GENERATE_JOB_NAME;

  dbms_scheduler.create_job(job_name => jobname, program_name =>
  'PROG_DELETE_TEST_BONUS_CONCUR');
  dbms_scheduler.set_job_argument_value(job_name => jobname, argument_name => 
  'in_pipe_name' , argument_value => in_pipe_name);
  dbms_scheduler.set_job_argument_value(job_name => jobname, argument_name => 
  'in_job' , argument_value => in_job);
  dbms_output.put_line(SYSDATE || ': Running job: '|| jobname);

  dbms_scheduler.RUN_JOB(jobname, false );
  io_job_amount:= io_job_amount+1;
END;


-- Anonymous "Main" block
BEGIN 
  delete histerr where module='ibm_odm_test';
  
   ibm_odm_test.init;
 create_prog_delete_test_bonus;

  -- Define private pipe
  retval := DBMS_PIPE.CREATE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME, 100, FALSE);  
  dbms_output.put_line(SYSDATE || ': Created pipe: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned ' ||retval);

  timestart := dbms_utility.get_time();

  -- start concurrent processing 
  RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'A', jobs_amount);
  RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'B', jobs_amount);
  RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'C', jobs_amount);

  -- "Barrier": Wait for all jobs to finish
  for i in 1 .. jobs_amount loop
    -- Reset the local buffer.
    DBMS_PIPE.RESET_BUFFER;

    -- Wait and receive message. Timeout after an hour.
    retval := SYS.DBMS_PIPE.RECEIVE_MESSAGE(SYS.DBMS_PIPE.UNIQUE_SESSION_NAME, 3600);
    -- Handle errors: timeout, etc.
    IF retval != 0 THEN
      raise_application_error(-20000, 'Error: '||to_char(retval)||' receiving on pipe. See Job Log in table user_scheduler_job_run_details');
    END IF;
    -- Read message from local buffer.
    DBMS_PIPE.UNPACK_MESSAGE(message);
    dbms_output.put_line(SYSDATE || ': Received message on '''|| DBMS_PIPE.UNIQUE_SESSION_NAME ||''' (Status='|| retval ||'): ' || message); 
  end loop;

  retval :=DBMS_PIPE.REMOVE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME);
  dbms_output.put_line(SYSDATE || ': REMOVE_PIPE: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned: ' ||retval);
EXCEPTION
WHEN OTHERS THEN
  dbms_output.put_line(SYSDATE ||  SUBSTR(SQLERRM, 1, 1000) || ' ' ||
                                          SUBSTR(DBMS_UTILITY.FORMAT_ERROR_BACKTRACE, 1, 1000));
  retval := DBMS_PIPE.REMOVE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME);
  dbms_output.put_line(SYSDATE || ': REMOVE_PIPE: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned: ' ||retval);

  -- Clean up in case of error
--  PROC_DELETE_TEST_BONUS('A');
--  PROC_DELETE_TEST_BONUS('B');
--  PROC_DELETE_TEST_BONUS('C');
  RAISE;
END;
/