<docbook><section><title>VirtBulkRDFLoaderScript</title><title> Loading the Bulk Loader Procedure and Sub-procedures into Virtuoso</title> Loading the Bulk Loader Procedure and Sub-procedures into Virtuoso
<bridgehead class="http://www.w3.org/1999/xhtml:h2"> How To Load the Procedures</bridgehead>
<para>The command line <emphasis><ulink url="http://docs.openlinksw.com/virtuoso/isql.html">isql</ulink></emphasis> program is the easiest way to load the Bulk Loader Procedure and Sub-procedures into a Virtuoso instance.</para>
<para>This can be done by saving the SQL script below into a file (rdfloader.sql, for example) and using the isql <emphasis>LOAD</emphasis> command as shown here:</para>
<programlisting>$ /opt/virtuoso/bin/isql 1111
Connected to OpenLink Virtuoso
Driver: 06.01.3127 OpenLink Virtuoso ODBC Driver
OpenLink Interactive SQL (Virtuoso), version 0.9849b.
Type HELP; for help and EXIT; to exit.

SQL&gt; LOAD rdfloader.sql;

Done. -- 5032 msec.

Done. -- 999 msec.

Done. -- 466 msec.

Done. -- 10 msec.

Done. -- 1297 msec.

Done. -- 101 msec.

Done. -- 94 msec.

Done. -- 81 msec.

Done. -- 105 msec.

Done. -- 440 msec.

Done. -- 69 msec.

Done. -- 145 msec.

Done. -- 443 msec.

Done. -- 81 msec.

Done. -- 60 msec.

Done. -- 121 msec.

Done. -- 103 msec.
SQL&gt; 
</programlisting><para> </para>
<bridgehead class="http://www.w3.org/1999/xhtml:h2"> Bulk Loader Procedure and Sub-procedures creation SQL script</bridgehead>
<programlisting>CREATE TABLE load_list (
  ll_file      VARCHAR,
  ll_graph     VARCHAR,
  ll_state     INT DEFAULT 0, -- 0 not started, 1 going, 2 done
  ll_started   DATETIME,
  ll_done      DATETIME,
  ll_host      INT,
  ll_work_time INTEGER,
  ll_error     VARCHAR,
  PRIMARY KEY (ll_file))
ALTER INDEX load_list ON load_list PARTITION (ll_file VARCHAR)
;

CREATE INDEX ll_state ON load_list (ll_state, ll_file, ll_graph) PARTITION (ll_state INT)
;


CREATE TABLE ldlock (id INT PRIMARY KEY)
  ALTER INDEX ldlock ON ldlock PARTITION (id INT)
;

INSERT INTO ldlock VALUES (0);


CREATE PROCEDURE
ld_dir (IN path VARCHAR, IN mask VARCHAR, IN graph VARCHAR)
{
  DECLARE ls ANY;
  DECLARE inx INT;
  ls := sys_dirlist (path, 1);
  FOR (inx := 0; inx &lt; LENGTH (ls); inx := inx + 1)
    {
      IF (ls[inx] LIKE mask)
	{
	  SET ISOLATION = &#39;serializable&#39;;

	  IF (NOT (EXISTS (SELECT 1 FROM DB.DBA.LOAD_LIST WHERE LL_FILE = path || &#39;/&#39; || ls[inx] FOR UPDATE)))
	    {
	      DECLARE gfile, cgfile, ngraph VARCHAR;
	      gfile := path || &#39;/&#39; || REPLACE (ls[inx], &#39;.gz&#39;, &#39;&#39;) || &#39;.graph&#39;;
	      cgfile := path || &#39;/&#39; || regexp_replace (REPLACE (ls[inx], &#39;.gz&#39;, &#39;&#39;), &#39;\\-[0-9]+\\.n&#39;, &#39;.n&#39;) || &#39;.graph&#39;;
	      IF (file_stat (gfile) &lt;&gt; 0)
		ngraph := TRIM (file_to_string (gfile), &#39; \r\n&#39;);
              ELSE IF (file_stat (cgfile) &lt;&gt; 0)
		ngraph := TRIM (file_to_string (cgfile), &#39; \r\n&#39;);
	      ELSE IF (file_stat (path || &#39;/&#39; || &#39;global.graph&#39;) &lt;&gt; 0)
		ngraph := TRIM (file_to_string (path || &#39;/&#39; || &#39;global.graph&#39;), &#39; \r\n&#39;);
	      ELSE
	        ngraph := graph;	
              IF (ngraph IS NOT NULL)
                {		
		  INSERT INTO DB.DBA.LOAD_LIST (ll_file, ll_graph) VALUES (path || &#39;/&#39; || ls[inx], ngraph);
		}
	    }

	  COMMIT WORK;
	}
    }
}
;


CREATE PROCEDURE
rdf_read_dir (IN path VARCHAR, IN mask VARCHAR, IN graph VARCHAR)
{
  ld_dirr (path, mask, graph);
}
;

CREATE PROCEDURE 
ld_dir_all (IN path VARCHAR, IN mask VARCHAR, IN graph VARCHAR)
{
  DECLARE ls ANY;
  DECLARE inx INT;
  ls := sys_dirlist (path, 0);
  ld_dir (path, mask, graph);
  FOR (inx := 0; inx &lt; LENGTH (ls); inx := inx + 1)
    {
      IF (ls[inx] &lt;&gt; &#39;.&#39; AND ls[inx] &lt;&gt; &#39;..&#39;)
	{
	  ld_dir_all (path||&#39;/&#39;||ls[inx], mask, graph);
	}
    }
}
;

CREATE PROCEDURE
ld_add (IN _fname VARCHAR, IN _graph VARCHAR)
{
  --log_message (sprintf (&#39;ld_add: %s, %s&#39;, _fname, _graph));

  SET ISOLATION = &#39;serializable&#39;;

  IF (NOT (EXISTS (SELECT 1 FROM DB.DBA.LOAD_LIST WHERE LL_FILE = _fname FOR UPDATE)))
    {
      INSERT INTO DB.DBA.LOAD_LIST (LL_FILE, LL_GRAPH) VALUES (_fname, _graph);
    }
  COMMIT WORK;
}
;

CREATE PROCEDURE 
ld_ttlp_flags (IN fname VARCHAR)
{
  IF (fname LIKE &#39;%/btc-2009%&#39; OR fname LIKE &#39;%.nq%&#39; OR fname LIKE &#39;%.n4&#39;)
    RETURN 255 + 512;
  RETURN 255;
}
;

CREATE PROCEDURE
ld_file (IN f VARCHAR, IN graph VARCHAR)
{
  DECLARE gzip_name VARCHAR;
  DECLARE exit handler FOR sqlstate &#39;*&#39; {
    ROLLBACK WORK;
    UPDATE DB.DBA.LOAD_LIST
      SET LL_STATE = 2,
          LL_DONE = CURDATETIME (),
          LL_ERROR = __sql_state || &#39; &#39; || __sql_message
      WHERE LL_FILE = f;
    COMMIT WORK;

    log_message (sprintf (&#39; File %s error %s %s&#39;, f, __sql_state, __sql_message));
    RETURN;
  };

  IF (f LIKE &#39;%.grdf&#39; OR f LIKE &#39;%.grdf.gz&#39;)
    {
      load_grdf (f);
    }
  ELSE IF (f LIKE &#39;%.gz&#39;)
    {
      gzip_name := regexp_replace (f, &#39;\.gz\x24&#39;, &#39;&#39;);
      IF (gzip_name LIKE &#39;%.xml&#39; OR gzip_name LIKE &#39;%.owl&#39; OR gzip_name LIKE &#39;%.rdf&#39;)
	DB.DBA.RDF_LOAD_RDFXML (gz_file_open (f), graph, graph);
      ELSE
	TTLP (gz_file_open (f), graph, graph, ld_ttlp_flags (gzip_name));
    }
  ELSE
    {
      IF (f LIKE &#39;%.xml&#39; OR f LIKE &#39;%.owl&#39; OR f LIKE &#39;%.rdf&#39;)
	DB.DBA.RDF_LOAD_RDFXML (file_open (f), graph, graph);
      ELSE
	TTLP (file_open (f), graph, graph, ld_ttlp_flags (f));
    }

  --log_message (sprintf (&#39;loaded %s&#39;, f));
}
;

CREATE PROCEDURE
rdf_load_dir (IN path VARCHAR,
              IN mask VARCHAR := &#39;%.nt&#39;,
              IN graph VARCHAR := &#39;http://dbpedia.org&#39;)
{

  DELETE FROM DB.DBA.LOAD_LIST WHERE LL_FILE = &#39;##stop&#39;;
  COMMIT WORK;

  ld_dir (path, mask, graph);

  rdf_loader_run ();
}
;


CREATE PROCEDURE 
ld_array ()
{
  DECLARE first, last, arr, len, local ANY;
  DECLARE cr CURSOR FOR
      SELECT TOP 100 LL_FILE, LL_GRAPH
        FROM DB.DBA.LOAD_LIST TABLE OPTION (INDEX ll_state)
        WHERE LL_STATE = 0
	FOR UPDATE;
  DECLARE fill INT;
  DECLARE f, g VARCHAR;
  DECLARE r ANY;
  WHENEVER NOT FOUND GOTO done;
  first := 0;
  last := 0;
 arr := make_array (100, &#39;any&#39;);
  fill := 0;
  OPEN cr;
  len := 0;
  FOR (;;)
    {
      FETCH cr INTO f, g;
      IF (0 = first) first := f;
      last := f;
      arr[fill] := VECTOR (f, g);
    len := len + CAST (file_stat (f, 1) AS INT);
      fill := fill + 1;
      IF (len &gt; 2000000)
	GOTO done;
    }
 done:
  IF (0 = first)
    RETURN 0;
  IF (1 &lt;&gt; sys_stat (&#39;cl_run_local_only&#39;)) 
    local := sys_stat (&#39;cl_this_host&#39;);
  UPDATE load_list SET ll_state = 1, ll_started = CURDATETIME (), LL_HOST = local
    WHERE ll_file &gt;= first AND ll_file &lt;= last;
  RETURN arr;
}
;

CREATE PROCEDURE
rdf_loader_run (IN max_files INTEGER := NULL, IN log_enable INT := 2)
{
  DECLARE sec_delay float;
  DECLARE _f, _graph VARCHAR;
  DECLARE arr ANY;
  DECLARE xx, inx, tx_mode, ld_mode INT;
  ld_mode := log_enable;
  IF (0 = sys_stat (&#39;cl_run_local_only&#39;))
    {
      IF (log_enable = 2 AND cl_this_host () = 1)
	{
	  cl_exec (&#39;checkpoint_interval (0)&#39;);
	  cl_exec (&#39;__dbf_set (&#39;&#39;cl_non_logged_write_mode&#39;&#39;, 1)&#39;);
	}
      IF (cl_this_host () = 1)
	cl_exec(&#39;__dbf_set(&#39;&#39;cl_max_keep_alives_missed&#39;&#39;,3000)&#39;);
    }
  tx_mode := bit_and (1, log_enable);
  log_message (&#39;Loader started&#39;);

  DELETE FROM DB.DBA.LOAD_LIST WHERE LL_FILE = &#39;##stop&#39;;
  COMMIT WORK;

  WHILE (1)
    {
      SET ISOLATION = &#39;repeatable&#39;;
      DECLARE exit handler FOR sqlstate &#39;40001&#39; {
	ROLLBACK WORK;
        sec_delay := RND(1000)*0.001;
	log_message(sprintf(&#39;deadlock in loader, waiting %d milliseconds&#39;, CAST (sec_delay * 1000 AS INTEGER)));
	delay(sec_delay);
	GOTO again;
      };

     again:;

      IF (EXISTS (SELECT 1 FROM DB.DBA.LOAD_LIST WHERE LL_FILE = &#39;##stop&#39;))
	{
	  log_message (&#39;File load stopped by rdf_load_stop.&#39;);
	  RETURN;
	}

      log_enable (tx_mode, 1);

      IF (max_files IS NOT NULL AND max_files &lt;= 0)
        {
	  COMMIT WORK;
	  log_message (&#39;Max_files reached. Finishing.&#39;);
          RETURN;
	}

      WHENEVER NOT FOUND GOTO looks_empty;

      --      log_message (&#39;Getting next file.&#39;);
      SET ISOLATION = &#39;serializable&#39;;
      SELECT id INTO xx FROM ldlock WHERE id = 0 FOR UPDATE;
      arr := ld_array ();
      COMMIT WORK;
      IF (0 = arr)
	GOTO looks_empty;
      log_enable (ld_mode, 1);

      FOR (inx := 0; inx &lt; 100; inx := inx + 1)
	{
	  IF (0 = arr[inx])
	    GOTO arr_done;
	  ld_file (arr[inx][0], arr[inx][1]);
	  UPDATE DB.DBA.LOAD_LIST SET LL_STATE = 2, LL_DONE = CURDATETIME () WHERE LL_FILE = arr[inx][0];
	}
    arr_done:
      log_enable (tx_mode, 1);


      IF (max_files IS NOT NULL) max_files := max_files - 100;

      COMMIT WORK;
    }

 looks_empty:
  COMMIT WORK;
  log_message (&#39;No more files to load. Loader has finished,&#39;);
  RETURN;

}
;

CREATE PROCEDURE 
rdf_load_stop (IN force INT := 0)
{
  INSERT INTO DB.DBA.LOAD_LIST (LL_FILE) VALUES (&#39;##stop&#39;);
  COMMIT WORK;
  IF (force)
    cl_exec (&#39;txn_killall (1)&#39;);
}
;


CREATE PROCEDURE 
RDF_LOADER_RUN_1 (IN x INT, IN y INT)
{
  rdf_loader_run (x, y);
}
;

CREATE PROCEDURE 
rdf_ld_srv (IN log_enable INT)
{
  DECLARE aq ANY;
  aq := async_queue (1);
  aq_request (aq, &#39;DB.DBA.RDF_LOADER_RUN_1&#39;, VECTOR (NULL, log_enable));
  aq_wait_all (aq);
}
;


CREATE PROCEDURE 
load_grdf (IN f VARCHAR)
{
  DECLARE line ANY;
  DECLARE inx INT;
  DECLARE ses ANY;
  DECLARE gr VARCHAR;

  IF (f LIKE &#39;%.gz&#39;)
    ses := gz_file_open (f);
  ELSE
    ses := file_open (f);
  inx := 0;
  line := &#39;&#39;;
  WHILE (line &lt;&gt; 0)
    { 
      gr := ses_read_line (ses, 0, 0, 1);
      IF (gr = 0) RETURN;
      line := ses_read_line (ses, 0, 0, 1);
      IF (line = 0) RETURN;
      DB.DBA.RDF_LOAD_RDFXML (line, gr, gr);
      inx := inx + 1;
    }
}
;

-- cl_exec (&#39;set lock_escalation_pct = 110&#39;);
-- cl_exec (&#39;DB.DBA.RDF_LD_SRV (1)&#39;) &amp;
-- cl_exec (&#39;DB.DBA.RDF_LD_SRV (2)&#39;) &amp;
</programlisting></section></docbook>