• Topic
  • Discussion
  • VOS.VirtBulkRDFLoaderScript(1.1) -- DAVWikiAdmin? , 2017-06-13 05:42:22 Edit WebDAV System Administrator 2017-06-13 05:42:22

    Loading the Bulk Loader Procedure and Sub-procedures into Virtuoso

    How To Load the Procedures

    The command line isql program is the easiest way to load the Bulk Loader Procedure and Sub-procedures into a Virtuoso instance.

    This can be done by saving the SQL script below into a file (rdfloader.sql, for example) and using the isql LOAD command as shown here:


    $ /opt/virtuoso/bin/isql 1111
    Connected to OpenLink Virtuoso
    Driver: 06.01.3127 OpenLink Virtuoso ODBC Driver
    OpenLink Interactive SQL (Virtuoso), version 0.9849b.
    Type HELP; for help and EXIT; to exit.
    
    SQL> LOAD rdfloader.sql;
    
    Done. -- 5032 msec.
    
    Done. -- 999 msec.
    
    Done. -- 466 msec.
    
    Done. -- 10 msec.
    
    Done. -- 1297 msec.
    
    Done. -- 101 msec.
    
    Done. -- 94 msec.
    
    Done. -- 81 msec.
    
    Done. -- 105 msec.
    
    Done. -- 440 msec.
    
    Done. -- 69 msec.
    
    Done. -- 145 msec.
    
    Done. -- 443 msec.
    
    Done. -- 81 msec.
    
    Done. -- 60 msec.
    
    Done. -- 121 msec.
    
    Done. -- 103 msec.
    SQL> 
    

    Bulk Loader Procedure and Sub-procedures creation SQL script


    CREATE TABLE load_list (
      ll_file      VARCHAR,
      ll_graph     VARCHAR,
      ll_state     INT DEFAULT 0, -- 0 not started, 1 going, 2 done
      ll_started   DATETIME,
      ll_done      DATETIME,
      ll_host      INT,
      ll_work_time INTEGER,
      ll_error     VARCHAR,
      PRIMARY KEY (ll_file))
    ALTER INDEX load_list ON load_list PARTITION (ll_file VARCHAR)
    ;
    
    CREATE INDEX ll_state ON load_list (ll_state, ll_file, ll_graph) PARTITION (ll_state INT)
    ;
    
    
    CREATE TABLE ldlock (id INT PRIMARY KEY)
      ALTER INDEX ldlock ON ldlock PARTITION (id INT)
    ;
    
    INSERT INTO ldlock VALUES (0);
    
    
    CREATE PROCEDURE
    ld_dir (IN path VARCHAR, IN mask VARCHAR, IN graph VARCHAR)
    {
      DECLARE ls ANY;
      DECLARE inx INT;
      ls := sys_dirlist (path, 1);
      FOR (inx := 0; inx < LENGTH (ls); inx := inx + 1)
        {
          IF (ls[inx] LIKE mask)
    	{
    	  SET ISOLATION = 'serializable';
    
    	  IF (NOT (EXISTS (SELECT 1 FROM DB.DBA.LOAD_LIST WHERE LL_FILE = path || '/' || ls[inx] FOR UPDATE)))
    	    {
    	      DECLARE gfile, cgfile, ngraph VARCHAR;
    	      gfile := path || '/' || REPLACE (ls[inx], '.gz', '') || '.graph';
    	      cgfile := path || '/' || regexp_replace (REPLACE (ls[inx], '.gz', ''), '\\-[0-9]+\\.n', '.n') || '.graph';
    	      IF (file_stat (gfile) <> 0)
    		ngraph := TRIM (file_to_string (gfile), ' \r\n');
                  ELSE IF (file_stat (cgfile) <> 0)
    		ngraph := TRIM (file_to_string (cgfile), ' \r\n');
    	      ELSE IF (file_stat (path || '/' || 'global.graph') <> 0)
    		ngraph := TRIM (file_to_string (path || '/' || 'global.graph'), ' \r\n');
    	      ELSE
    	        ngraph := graph;	
                  IF (ngraph IS NOT NULL)
                    {		
    		  INSERT INTO DB.DBA.LOAD_LIST (ll_file, ll_graph) VALUES (path || '/' || ls[inx], ngraph);
    		}
    	    }
    
    	  COMMIT WORK;
    	}
        }
    }
    ;
    
    
    CREATE PROCEDURE
    rdf_read_dir (IN path VARCHAR, IN mask VARCHAR, IN graph VARCHAR)
    {
      ld_dirr (path, mask, graph);
    }
    ;
    
    CREATE PROCEDURE 
    ld_dir_all (IN path VARCHAR, IN mask VARCHAR, IN graph VARCHAR)
    {
      DECLARE ls ANY;
      DECLARE inx INT;
      ls := sys_dirlist (path, 0);
      ld_dir (path, mask, graph);
      FOR (inx := 0; inx < LENGTH (ls); inx := inx + 1)
        {
          IF (ls[inx] <> '.' AND ls[inx] <> '..')
    	{
    	  ld_dir_all (path||'/'||ls[inx], mask, graph);
    	}
        }
    }
    ;
    
    CREATE PROCEDURE
    ld_add (IN _fname VARCHAR, IN _graph VARCHAR)
    {
      --log_message (sprintf ('ld_add: %s, %s', _fname, _graph));
    
      SET ISOLATION = 'serializable';
    
      IF (NOT (EXISTS (SELECT 1 FROM DB.DBA.LOAD_LIST WHERE LL_FILE = _fname FOR UPDATE)))
        {
          INSERT INTO DB.DBA.LOAD_LIST (LL_FILE, LL_GRAPH) VALUES (_fname, _graph);
        }
      COMMIT WORK;
    }
    ;
    
    CREATE PROCEDURE 
    ld_ttlp_flags (IN fname VARCHAR)
    {
      IF (fname LIKE '%/btc-2009%' OR fname LIKE '%.nq%' OR fname LIKE '%.n4')
        RETURN 255 + 512;
      RETURN 255;
    }
    ;
    
    CREATE PROCEDURE
    ld_file (IN f VARCHAR, IN graph VARCHAR)
    {
      DECLARE gzip_name VARCHAR;
      DECLARE exit handler FOR sqlstate '*' {
        ROLLBACK WORK;
        UPDATE DB.DBA.LOAD_LIST
          SET LL_STATE = 2,
              LL_DONE = CURDATETIME (),
              LL_ERROR = __sql_state || ' ' || __sql_message
          WHERE LL_FILE = f;
        COMMIT WORK;
    
        log_message (sprintf (' File %s error %s %s', f, __sql_state, __sql_message));
        RETURN;
      };
    
      IF (f LIKE '%.grdf' OR f LIKE '%.grdf.gz')
        {
          load_grdf (f);
        }
      ELSE IF (f LIKE '%.gz')
        {
          gzip_name := regexp_replace (f, '\.gz\x24', '');
          IF (gzip_name LIKE '%.xml' OR gzip_name LIKE '%.owl' OR gzip_name LIKE '%.rdf')
    	DB.DBA.RDF_LOAD_RDFXML (gz_file_open (f), graph, graph);
          ELSE
    	TTLP (gz_file_open (f), graph, graph, ld_ttlp_flags (gzip_name));
        }
      ELSE
        {
          IF (f LIKE '%.xml' OR f LIKE '%.owl' OR f LIKE '%.rdf')
    	DB.DBA.RDF_LOAD_RDFXML (file_open (f), graph, graph);
          ELSE
    	TTLP (file_open (f), graph, graph, ld_ttlp_flags (f));
        }
    
      --log_message (sprintf ('loaded %s', f));
    }
    ;
    
    CREATE PROCEDURE
    rdf_load_dir (IN path VARCHAR,
                  IN mask VARCHAR := '%.nt',
                  IN graph VARCHAR := 'http://dbpedia.org')
    {
    
      DELETE FROM DB.DBA.LOAD_LIST WHERE LL_FILE = '##stop';
      COMMIT WORK;
    
      ld_dir (path, mask, graph);
    
      rdf_loader_run ();
    }
    ;
    
    
    CREATE PROCEDURE 
    ld_array ()
    {
      DECLARE first, last, arr, len, local ANY;
      DECLARE cr CURSOR FOR
          SELECT TOP 100 LL_FILE, LL_GRAPH
            FROM DB.DBA.LOAD_LIST TABLE OPTION (INDEX ll_state)
            WHERE LL_STATE = 0
    	FOR UPDATE;
      DECLARE fill INT;
      DECLARE f, g VARCHAR;
      DECLARE r ANY;
      WHENEVER NOT FOUND GOTO done;
      first := 0;
      last := 0;
     arr := make_array (100, 'any');
      fill := 0;
      OPEN cr;
      len := 0;
      FOR (;;)
        {
          FETCH cr INTO f, g;
          IF (0 = first) first := f;
          last := f;
          arr[fill] := VECTOR (f, g);
        len := len + CAST (file_stat (f, 1) AS INT);
          fill := fill + 1;
          IF (len > 2000000)
    	GOTO done;
        }
     done:
      IF (0 = first)
        RETURN 0;
      IF (1 <> sys_stat ('cl_run_local_only')) 
        local := sys_stat ('cl_this_host');
      UPDATE load_list SET ll_state = 1, ll_started = CURDATETIME (), LL_HOST = local
        WHERE ll_file >= first AND ll_file <= last;
      RETURN arr;
    }
    ;
    
    CREATE PROCEDURE
    rdf_loader_run (IN max_files INTEGER := NULL, IN log_enable INT := 2)
    {
      DECLARE sec_delay float;
      DECLARE _f, _graph VARCHAR;
      DECLARE arr ANY;
      DECLARE xx, inx, tx_mode, ld_mode INT;
      ld_mode := log_enable;
      IF (0 = sys_stat ('cl_run_local_only'))
        {
          IF (log_enable = 2 AND cl_this_host () = 1)
    	{
    	  cl_exec ('checkpoint_interval (0)');
    	  cl_exec ('__dbf_set (''cl_non_logged_write_mode'', 1)');
    	}
          IF (cl_this_host () = 1)
    	cl_exec('__dbf_set(''cl_max_keep_alives_missed'',3000)');
        }
      tx_mode := bit_and (1, log_enable);
      log_message ('Loader started');
    
      DELETE FROM DB.DBA.LOAD_LIST WHERE LL_FILE = '##stop';
      COMMIT WORK;
    
      WHILE (1)
        {
          SET ISOLATION = 'repeatable';
          DECLARE exit handler FOR sqlstate '40001' {
    	ROLLBACK WORK;
            sec_delay := RND(1000)*0.001;
    	log_message(sprintf('deadlock in loader, waiting %d milliseconds', CAST (sec_delay * 1000 AS INTEGER)));
    	delay(sec_delay);
    	GOTO again;
          };
    
         again:;
    
          IF (EXISTS (SELECT 1 FROM DB.DBA.LOAD_LIST WHERE LL_FILE = '##stop'))
    	{
    	  log_message ('File load stopped by rdf_load_stop.');
    	  RETURN;
    	}
    
          log_enable (tx_mode, 1);
    
          IF (max_files IS NOT NULL AND max_files <= 0)
            {
    	  COMMIT WORK;
    	  log_message ('Max_files reached. Finishing.');
              RETURN;
    	}
    
          WHENEVER NOT FOUND GOTO looks_empty;
    
          --      log_message ('Getting next file.');
          SET ISOLATION = 'serializable';
          SELECT id INTO xx FROM ldlock WHERE id = 0 FOR UPDATE;
          arr := ld_array ();
          COMMIT WORK;
          IF (0 = arr)
    	GOTO looks_empty;
          log_enable (ld_mode, 1);
    
          FOR (inx := 0; inx < 100; inx := inx + 1)
    	{
    	  IF (0 = arr[inx])
    	    GOTO arr_done;
    	  ld_file (arr[inx][0], arr[inx][1]);
    	  UPDATE DB.DBA.LOAD_LIST SET LL_STATE = 2, LL_DONE = CURDATETIME () WHERE LL_FILE = arr[inx][0];
    	}
        arr_done:
          log_enable (tx_mode, 1);
    
    
          IF (max_files IS NOT NULL) max_files := max_files - 100;
    
          COMMIT WORK;
        }
    
     looks_empty:
      COMMIT WORK;
      log_message ('No more files to load. Loader has finished,');
      RETURN;
    
    }
    ;
    
    CREATE PROCEDURE 
    rdf_load_stop (IN force INT := 0)
    {
      INSERT INTO DB.DBA.LOAD_LIST (LL_FILE) VALUES ('##stop');
      COMMIT WORK;
      IF (force)
        cl_exec ('txn_killall (1)');
    }
    ;
    
    
    CREATE PROCEDURE 
    RDF_LOADER_RUN_1 (IN x INT, IN y INT)
    {
      rdf_loader_run (x, y);
    }
    ;
    
    CREATE PROCEDURE 
    rdf_ld_srv (IN log_enable INT)
    {
      DECLARE aq ANY;
      aq := async_queue (1);
      aq_request (aq, 'DB.DBA.RDF_LOADER_RUN_1', VECTOR (NULL, log_enable));
      aq_wait_all (aq);
    }
    ;
    
    
    CREATE PROCEDURE 
    load_grdf (IN f VARCHAR)
    {
      DECLARE line ANY;
      DECLARE inx INT;
      DECLARE ses ANY;
      DECLARE gr VARCHAR;
    
      IF (f LIKE '%.gz')
        ses := gz_file_open (f);
      ELSE
        ses := file_open (f);
      inx := 0;
      line := '';
      WHILE (line <> 0)
        { 
          gr := ses_read_line (ses, 0, 0, 1);
          IF (gr = 0) RETURN;
          line := ses_read_line (ses, 0, 0, 1);
          IF (line = 0) RETURN;
          DB.DBA.RDF_LOAD_RDFXML (line, gr, gr);
          inx := inx + 1;
        }
    }
    ;
    
    -- cl_exec ('set lock_escalation_pct = 110');
    -- cl_exec ('DB.DBA.RDF_LD_SRV (1)') &
    -- cl_exec ('DB.DBA.RDF_LD_SRV (2)') &