Loading the Bulk Loader Procedure and Sub-procedures into Virtuoso

How To Load the Procedures

The command line isql program is the easiest way to load the Bulk Loader Procedure and Sub-procedures into a Virtuoso instance.

This can be done by saving the SQL script below into a file (rdfloader.sql, for example) and using the isql LOAD command as shown here:


$ /opt/virtuoso/bin/isql 1111
Connected to OpenLink Virtuoso
Driver: 06.01.3127 OpenLink Virtuoso ODBC Driver
OpenLink Interactive SQL (Virtuoso), version 0.9849b.
Type HELP; for help and EXIT; to exit.

SQL> LOAD rdfloader.sql;

Done. -- 5032 msec.

Done. -- 999 msec.

Done. -- 466 msec.

Done. -- 10 msec.

Done. -- 1297 msec.

Done. -- 101 msec.

Done. -- 94 msec.

Done. -- 81 msec.

Done. -- 105 msec.

Done. -- 440 msec.

Done. -- 69 msec.

Done. -- 145 msec.

Done. -- 443 msec.

Done. -- 81 msec.

Done. -- 60 msec.

Done. -- 121 msec.

Done. -- 103 msec.
SQL> 

Bulk Loader Procedure and Sub-procedures creation SQL script


CREATE TABLE load_list (
  ll_file      VARCHAR,
  ll_graph     VARCHAR,
  ll_state     INT DEFAULT 0, -- 0 not started, 1 going, 2 done
  ll_started   DATETIME,
  ll_done      DATETIME,
  ll_host      INT,
  ll_work_time INTEGER,
  ll_error     VARCHAR,
  PRIMARY KEY (ll_file))
ALTER INDEX load_list ON load_list PARTITION (ll_file VARCHAR)
;

CREATE INDEX ll_state ON load_list (ll_state, ll_file, ll_graph) PARTITION (ll_state INT)
;


CREATE TABLE ldlock (id INT PRIMARY KEY)
  ALTER INDEX ldlock ON ldlock PARTITION (id INT)
;

INSERT INTO ldlock VALUES (0);


CREATE PROCEDURE
ld_dir (IN path VARCHAR, IN mask VARCHAR, IN graph VARCHAR)
{
  DECLARE ls ANY;
  DECLARE inx INT;
  ls := sys_dirlist (path, 1);
  FOR (inx := 0; inx < LENGTH (ls); inx := inx + 1)
    {
      IF (ls[inx] LIKE mask)
	{
	  SET ISOLATION = 'serializable';

	  IF (NOT (EXISTS (SELECT 1 FROM DB.DBA.LOAD_LIST WHERE LL_FILE = path || '/' || ls[inx] FOR UPDATE)))
	    {
	      DECLARE gfile, cgfile, ngraph VARCHAR;
	      gfile := path || '/' || REPLACE (ls[inx], '.gz', '') || '.graph';
	      cgfile := path || '/' || regexp_replace (REPLACE (ls[inx], '.gz', ''), '\\-[0-9]+\\.n', '.n') || '.graph';
	      IF (file_stat (gfile) <> 0)
		ngraph := TRIM (file_to_string (gfile), ' \r\n');
              ELSE IF (file_stat (cgfile) <> 0)
		ngraph := TRIM (file_to_string (cgfile), ' \r\n');
	      ELSE IF (file_stat (path || '/' || 'global.graph') <> 0)
		ngraph := TRIM (file_to_string (path || '/' || 'global.graph'), ' \r\n');
	      ELSE
	        ngraph := graph;	
              IF (ngraph IS NOT NULL)
                {		
		  INSERT INTO DB.DBA.LOAD_LIST (ll_file, ll_graph) VALUES (path || '/' || ls[inx], ngraph);
		}
	    }

	  COMMIT WORK;
	}
    }
}
;


CREATE PROCEDURE
rdf_read_dir (IN path VARCHAR, IN mask VARCHAR, IN graph VARCHAR)
{
  ld_dirr (path, mask, graph);
}
;

CREATE PROCEDURE 
ld_dir_all (IN path VARCHAR, IN mask VARCHAR, IN graph VARCHAR)
{
  DECLARE ls ANY;
  DECLARE inx INT;
  ls := sys_dirlist (path, 0);
  ld_dir (path, mask, graph);
  FOR (inx := 0; inx < LENGTH (ls); inx := inx + 1)
    {
      IF (ls[inx] <> '.' AND ls[inx] <> '..')
	{
	  ld_dir_all (path||'/'||ls[inx], mask, graph);
	}
    }
}
;

CREATE PROCEDURE
ld_add (IN _fname VARCHAR, IN _graph VARCHAR)
{
  --log_message (sprintf ('ld_add: %s, %s', _fname, _graph));

  SET ISOLATION = 'serializable';

  IF (NOT (EXISTS (SELECT 1 FROM DB.DBA.LOAD_LIST WHERE LL_FILE = _fname FOR UPDATE)))
    {
      INSERT INTO DB.DBA.LOAD_LIST (LL_FILE, LL_GRAPH) VALUES (_fname, _graph);
    }
  COMMIT WORK;
}
;

CREATE PROCEDURE 
ld_ttlp_flags (IN fname VARCHAR)
{
  IF (fname LIKE '%/btc-2009%' OR fname LIKE '%.nq%' OR fname LIKE '%.n4')
    RETURN 255 + 512;
  RETURN 255;
}
;

CREATE PROCEDURE
ld_file (IN f VARCHAR, IN graph VARCHAR)
{
  DECLARE gzip_name VARCHAR;
  DECLARE exit handler FOR sqlstate '*' {
    ROLLBACK WORK;
    UPDATE DB.DBA.LOAD_LIST
      SET LL_STATE = 2,
          LL_DONE = CURDATETIME (),
          LL_ERROR = __sql_state || ' ' || __sql_message
      WHERE LL_FILE = f;
    COMMIT WORK;

    log_message (sprintf (' File %s error %s %s', f, __sql_state, __sql_message));
    RETURN;
  };

  IF (f LIKE '%.grdf' OR f LIKE '%.grdf.gz')
    {
      load_grdf (f);
    }
  ELSE IF (f LIKE '%.gz')
    {
      gzip_name := regexp_replace (f, '\.gz\x24', '');
      IF (gzip_name LIKE '%.xml' OR gzip_name LIKE '%.owl' OR gzip_name LIKE '%.rdf')
	DB.DBA.RDF_LOAD_RDFXML (gz_file_open (f), graph, graph);
      ELSE
	TTLP (gz_file_open (f), graph, graph, ld_ttlp_flags (gzip_name));
    }
  ELSE
    {
      IF (f LIKE '%.xml' OR f LIKE '%.owl' OR f LIKE '%.rdf')
	DB.DBA.RDF_LOAD_RDFXML (file_open (f), graph, graph);
      ELSE
	TTLP (file_open (f), graph, graph, ld_ttlp_flags (f));
    }

  --log_message (sprintf ('loaded %s', f));
}
;

CREATE PROCEDURE
rdf_load_dir (IN path VARCHAR,
              IN mask VARCHAR := '%.nt',
              IN graph VARCHAR := 'http://dbpedia.org')
{

  DELETE FROM DB.DBA.LOAD_LIST WHERE LL_FILE = '##stop';
  COMMIT WORK;

  ld_dir (path, mask, graph);

  rdf_loader_run ();
}
;


CREATE PROCEDURE 
ld_array ()
{
  DECLARE first, last, arr, len, local ANY;
  DECLARE cr CURSOR FOR
      SELECT TOP 100 LL_FILE, LL_GRAPH
        FROM DB.DBA.LOAD_LIST TABLE OPTION (INDEX ll_state)
        WHERE LL_STATE = 0
	FOR UPDATE;
  DECLARE fill INT;
  DECLARE f, g VARCHAR;
  DECLARE r ANY;
  WHENEVER NOT FOUND GOTO done;
  first := 0;
  last := 0;
 arr := make_array (100, 'any');
  fill := 0;
  OPEN cr;
  len := 0;
  FOR (;;)
    {
      FETCH cr INTO f, g;
      IF (0 = first) first := f;
      last := f;
      arr[fill] := VECTOR (f, g);
    len := len + CAST (file_stat (f, 1) AS INT);
      fill := fill + 1;
      IF (len > 2000000)
	GOTO done;
    }
 done:
  IF (0 = first)
    RETURN 0;
  IF (1 <> sys_stat ('cl_run_local_only')) 
    local := sys_stat ('cl_this_host');
  UPDATE load_list SET ll_state = 1, ll_started = CURDATETIME (), LL_HOST = local
    WHERE ll_file >= first AND ll_file <= last;
  RETURN arr;
}
;

CREATE PROCEDURE
rdf_loader_run (IN max_files INTEGER := NULL, IN log_enable INT := 2)
{
  DECLARE sec_delay float;
  DECLARE _f, _graph VARCHAR;
  DECLARE arr ANY;
  DECLARE xx, inx, tx_mode, ld_mode INT;
  ld_mode := log_enable;
  IF (0 = sys_stat ('cl_run_local_only'))
    {
      IF (log_enable = 2 AND cl_this_host () = 1)
	{
	  cl_exec ('checkpoint_interval (0)');
	  cl_exec ('__dbf_set (''cl_non_logged_write_mode'', 1)');
	}
      IF (cl_this_host () = 1)
	cl_exec('__dbf_set(''cl_max_keep_alives_missed'',3000)');
    }
  tx_mode := bit_and (1, log_enable);
  log_message ('Loader started');

  DELETE FROM DB.DBA.LOAD_LIST WHERE LL_FILE = '##stop';
  COMMIT WORK;

  WHILE (1)
    {
      SET ISOLATION = 'repeatable';
      DECLARE exit handler FOR sqlstate '40001' {
	ROLLBACK WORK;
        sec_delay := RND(1000)*0.001;
	log_message(sprintf('deadlock in loader, waiting %d milliseconds', CAST (sec_delay * 1000 AS INTEGER)));
	delay(sec_delay);
	GOTO again;
      };

     again:;

      IF (EXISTS (SELECT 1 FROM DB.DBA.LOAD_LIST WHERE LL_FILE = '##stop'))
	{
	  log_message ('File load stopped by rdf_load_stop.');
	  RETURN;
	}

      log_enable (tx_mode, 1);

      IF (max_files IS NOT NULL AND max_files <= 0)
        {
	  COMMIT WORK;
	  log_message ('Max_files reached. Finishing.');
          RETURN;
	}

      WHENEVER NOT FOUND GOTO looks_empty;

      --      log_message ('Getting next file.');
      SET ISOLATION = 'serializable';
      SELECT id INTO xx FROM ldlock WHERE id = 0 FOR UPDATE;
      arr := ld_array ();
      COMMIT WORK;
      IF (0 = arr)
	GOTO looks_empty;
      log_enable (ld_mode, 1);

      FOR (inx := 0; inx < 100; inx := inx + 1)
	{
	  IF (0 = arr[inx])
	    GOTO arr_done;
	  ld_file (arr[inx][0], arr[inx][1]);
	  UPDATE DB.DBA.LOAD_LIST SET LL_STATE = 2, LL_DONE = CURDATETIME () WHERE LL_FILE = arr[inx][0];
	}
    arr_done:
      log_enable (tx_mode, 1);


      IF (max_files IS NOT NULL) max_files := max_files - 100;

      COMMIT WORK;
    }

 looks_empty:
  COMMIT WORK;
  log_message ('No more files to load. Loader has finished,');
  RETURN;

}
;

CREATE PROCEDURE 
rdf_load_stop (IN force INT := 0)
{
  INSERT INTO DB.DBA.LOAD_LIST (LL_FILE) VALUES ('##stop');
  COMMIT WORK;
  IF (force)
    cl_exec ('txn_killall (1)');
}
;


CREATE PROCEDURE 
RDF_LOADER_RUN_1 (IN x INT, IN y INT)
{
  rdf_loader_run (x, y);
}
;

CREATE PROCEDURE 
rdf_ld_srv (IN log_enable INT)
{
  DECLARE aq ANY;
  aq := async_queue (1);
  aq_request (aq, 'DB.DBA.RDF_LOADER_RUN_1', VECTOR (NULL, log_enable));
  aq_wait_all (aq);
}
;


CREATE PROCEDURE 
load_grdf (IN f VARCHAR)
{
  DECLARE line ANY;
  DECLARE inx INT;
  DECLARE ses ANY;
  DECLARE gr VARCHAR;

  IF (f LIKE '%.gz')
    ses := gz_file_open (f);
  ELSE
    ses := file_open (f);
  inx := 0;
  line := '';
  WHILE (line <> 0)
    { 
      gr := ses_read_line (ses, 0, 0, 1);
      IF (gr = 0) RETURN;
      line := ses_read_line (ses, 0, 0, 1);
      IF (line = 0) RETURN;
      DB.DBA.RDF_LOAD_RDFXML (line, gr, gr);
      inx := inx + 1;
    }
}
;

-- cl_exec ('set lock_escalation_pct = 110');
-- cl_exec ('DB.DBA.RDF_LD_SRV (1)') &
-- cl_exec ('DB.DBA.RDF_LD_SRV (2)') &