The command line isql
program is the easiest way to load the Bulk Loader Procedure and Sub-procedures into a Virtuoso instance.
This can be done by saving the SQL script below into a file (rdfloader.sql
, for example) and using the isql
LOAD
command as shown here:
$ /opt/virtuoso/bin/isql 1111 Connected to OpenLink Virtuoso Driver: 06.01.3127 OpenLink Virtuoso ODBC Driver OpenLink Interactive SQL (Virtuoso), version 0.9849b. Type HELP; for help and EXIT; to exit. SQL> LOAD rdfloader.sql; Done. -- 5032 msec. Done. -- 999 msec. Done. -- 466 msec. Done. -- 10 msec. Done. -- 1297 msec. Done. -- 101 msec. Done. -- 94 msec. Done. -- 81 msec. Done. -- 105 msec. Done. -- 440 msec. Done. -- 69 msec. Done. -- 145 msec. Done. -- 443 msec. Done. -- 81 msec. Done. -- 60 msec. Done. -- 121 msec. Done. -- 103 msec. SQL>
CREATE TABLE load_list ( ll_file VARCHAR, ll_graph VARCHAR, ll_state INT DEFAULT 0, -- 0 not started, 1 going, 2 done ll_started DATETIME, ll_done DATETIME, ll_host INT, ll_work_time INTEGER, ll_error VARCHAR, PRIMARY KEY (ll_file)) ALTER INDEX load_list ON load_list PARTITION (ll_file VARCHAR) ; CREATE INDEX ll_state ON load_list (ll_state, ll_file, ll_graph) PARTITION (ll_state INT) ; CREATE TABLE ldlock (id INT PRIMARY KEY) ALTER INDEX ldlock ON ldlock PARTITION (id INT) ; INSERT INTO ldlock VALUES (0); CREATE PROCEDURE ld_dir (IN path VARCHAR, IN mask VARCHAR, IN graph VARCHAR) { DECLARE ls ANY; DECLARE inx INT; ls := sys_dirlist (path, 1); FOR (inx := 0; inx < LENGTH (ls); inx := inx + 1) { IF (ls[inx] LIKE mask) { SET ISOLATION = 'serializable'; IF (NOT (EXISTS (SELECT 1 FROM DB.DBA.LOAD_LIST WHERE LL_FILE = path || '/' || ls[inx] FOR UPDATE))) { DECLARE gfile, cgfile, ngraph VARCHAR; gfile := path || '/' || REPLACE (ls[inx], '.gz', '') || '.graph'; cgfile := path || '/' || regexp_replace (REPLACE (ls[inx], '.gz', ''), '\\-[0-9]+\\.n', '.n') || '.graph'; IF (file_stat (gfile) <> 0) ngraph := TRIM (file_to_string (gfile), ' \r\n'); ELSE IF (file_stat (cgfile) <> 0) ngraph := TRIM (file_to_string (cgfile), ' \r\n'); ELSE IF (file_stat (path || '/' || 'global.graph') <> 0) ngraph := TRIM (file_to_string (path || '/' || 'global.graph'), ' \r\n'); ELSE ngraph := graph; IF (ngraph IS NOT NULL) { INSERT INTO DB.DBA.LOAD_LIST (ll_file, ll_graph) VALUES (path || '/' || ls[inx], ngraph); } } COMMIT WORK; } } } ; CREATE PROCEDURE rdf_read_dir (IN path VARCHAR, IN mask VARCHAR, IN graph VARCHAR) { ld_dirr (path, mask, graph); } ; CREATE PROCEDURE ld_dir_all (IN path VARCHAR, IN mask VARCHAR, IN graph VARCHAR) { DECLARE ls ANY; DECLARE inx INT; ls := sys_dirlist (path, 0); ld_dir (path, mask, graph); FOR (inx := 0; inx < LENGTH (ls); inx := inx + 1) { IF (ls[inx] <> '.' AND ls[inx] <> '..') { ld_dir_all (path||'/'||ls[inx], mask, graph); } } } ; CREATE PROCEDURE ld_add (IN _fname VARCHAR, IN _graph VARCHAR) { --log_message (sprintf ('ld_add: %s, %s', _fname, _graph)); SET ISOLATION = 'serializable'; IF (NOT (EXISTS (SELECT 1 FROM DB.DBA.LOAD_LIST WHERE LL_FILE = _fname FOR UPDATE))) { INSERT INTO DB.DBA.LOAD_LIST (LL_FILE, LL_GRAPH) VALUES (_fname, _graph); } COMMIT WORK; } ; CREATE PROCEDURE ld_ttlp_flags (IN fname VARCHAR) { IF (fname LIKE '%/btc-2009%' OR fname LIKE '%.nq%' OR fname LIKE '%.n4') RETURN 255 + 512; RETURN 255; } ; CREATE PROCEDURE ld_file (IN f VARCHAR, IN graph VARCHAR) { DECLARE gzip_name VARCHAR; DECLARE exit handler FOR sqlstate '*' { ROLLBACK WORK; UPDATE DB.DBA.LOAD_LIST SET LL_STATE = 2, LL_DONE = CURDATETIME (), LL_ERROR = __sql_state || ' ' || __sql_message WHERE LL_FILE = f; COMMIT WORK; log_message (sprintf (' File %s error %s %s', f, __sql_state, __sql_message)); RETURN; }; IF (f LIKE '%.grdf' OR f LIKE '%.grdf.gz') { load_grdf (f); } ELSE IF (f LIKE '%.gz') { gzip_name := regexp_replace (f, '\.gz\x24', ''); IF (gzip_name LIKE '%.xml' OR gzip_name LIKE '%.owl' OR gzip_name LIKE '%.rdf') DB.DBA.RDF_LOAD_RDFXML (gz_file_open (f), graph, graph); ELSE TTLP (gz_file_open (f), graph, graph, ld_ttlp_flags (gzip_name)); } ELSE { IF (f LIKE '%.xml' OR f LIKE '%.owl' OR f LIKE '%.rdf') DB.DBA.RDF_LOAD_RDFXML (file_open (f), graph, graph); ELSE TTLP (file_open (f), graph, graph, ld_ttlp_flags (f)); } --log_message (sprintf ('loaded %s', f)); } ; CREATE PROCEDURE rdf_load_dir (IN path VARCHAR, IN mask VARCHAR := '%.nt', IN graph VARCHAR := 'http://dbpedia.org') { DELETE FROM DB.DBA.LOAD_LIST WHERE LL_FILE = '##stop'; COMMIT WORK; ld_dir (path, mask, graph); rdf_loader_run (); } ; CREATE PROCEDURE ld_array () { DECLARE first, last, arr, len, local ANY; DECLARE cr CURSOR FOR SELECT TOP 100 LL_FILE, LL_GRAPH FROM DB.DBA.LOAD_LIST TABLE OPTION (INDEX ll_state) WHERE LL_STATE = 0 FOR UPDATE; DECLARE fill INT; DECLARE f, g VARCHAR; DECLARE r ANY; WHENEVER NOT FOUND GOTO done; first := 0; last := 0; arr := make_array (100, 'any'); fill := 0; OPEN cr; len := 0; FOR (;;) { FETCH cr INTO f, g; IF (0 = first) first := f; last := f; arr[fill] := VECTOR (f, g); len := len + CAST (file_stat (f, 1) AS INT); fill := fill + 1; IF (len > 2000000) GOTO done; } done: IF (0 = first) RETURN 0; IF (1 <> sys_stat ('cl_run_local_only')) local := sys_stat ('cl_this_host'); UPDATE load_list SET ll_state = 1, ll_started = CURDATETIME (), LL_HOST = local WHERE ll_file >= first AND ll_file <= last; RETURN arr; } ; CREATE PROCEDURE rdf_loader_run (IN max_files INTEGER := NULL, IN log_enable INT := 2) { DECLARE sec_delay float; DECLARE _f, _graph VARCHAR; DECLARE arr ANY; DECLARE xx, inx, tx_mode, ld_mode INT; ld_mode := log_enable; IF (0 = sys_stat ('cl_run_local_only')) { IF (log_enable = 2 AND cl_this_host () = 1) { cl_exec ('checkpoint_interval (0)'); cl_exec ('__dbf_set (''cl_non_logged_write_mode'', 1)'); } IF (cl_this_host () = 1) cl_exec('__dbf_set(''cl_max_keep_alives_missed'',3000)'); } tx_mode := bit_and (1, log_enable); log_message ('Loader started'); DELETE FROM DB.DBA.LOAD_LIST WHERE LL_FILE = '##stop'; COMMIT WORK; WHILE (1) { SET ISOLATION = 'repeatable'; DECLARE exit handler FOR sqlstate '40001' { ROLLBACK WORK; sec_delay := RND(1000)*0.001; log_message(sprintf('deadlock in loader, waiting %d milliseconds', CAST (sec_delay * 1000 AS INTEGER))); delay(sec_delay); GOTO again; }; again:; IF (EXISTS (SELECT 1 FROM DB.DBA.LOAD_LIST WHERE LL_FILE = '##stop')) { log_message ('File load stopped by rdf_load_stop.'); RETURN; } log_enable (tx_mode, 1); IF (max_files IS NOT NULL AND max_files <= 0) { COMMIT WORK; log_message ('Max_files reached. Finishing.'); RETURN; } WHENEVER NOT FOUND GOTO looks_empty; -- log_message ('Getting next file.'); SET ISOLATION = 'serializable'; SELECT id INTO xx FROM ldlock WHERE id = 0 FOR UPDATE; arr := ld_array (); COMMIT WORK; IF (0 = arr) GOTO looks_empty; log_enable (ld_mode, 1); FOR (inx := 0; inx < 100; inx := inx + 1) { IF (0 = arr[inx]) GOTO arr_done; ld_file (arr[inx][0], arr[inx][1]); UPDATE DB.DBA.LOAD_LIST SET LL_STATE = 2, LL_DONE = CURDATETIME () WHERE LL_FILE = arr[inx][0]; } arr_done: log_enable (tx_mode, 1); IF (max_files IS NOT NULL) max_files := max_files - 100; COMMIT WORK; } looks_empty: COMMIT WORK; log_message ('No more files to load. Loader has finished,'); RETURN; } ; CREATE PROCEDURE rdf_load_stop (IN force INT := 0) { INSERT INTO DB.DBA.LOAD_LIST (LL_FILE) VALUES ('##stop'); COMMIT WORK; IF (force) cl_exec ('txn_killall (1)'); } ; CREATE PROCEDURE RDF_LOADER_RUN_1 (IN x INT, IN y INT) { rdf_loader_run (x, y); } ; CREATE PROCEDURE rdf_ld_srv (IN log_enable INT) { DECLARE aq ANY; aq := async_queue (1); aq_request (aq, 'DB.DBA.RDF_LOADER_RUN_1', VECTOR (NULL, log_enable)); aq_wait_all (aq); } ; CREATE PROCEDURE load_grdf (IN f VARCHAR) { DECLARE line ANY; DECLARE inx INT; DECLARE ses ANY; DECLARE gr VARCHAR; IF (f LIKE '%.gz') ses := gz_file_open (f); ELSE ses := file_open (f); inx := 0; line := ''; WHILE (line <> 0) { gr := ses_read_line (ses, 0, 0, 1); IF (gr = 0) RETURN; line := ses_read_line (ses, 0, 0, 1); IF (line = 0) RETURN; DB.DBA.RDF_LOAD_RDFXML (line, gr, gr); inx := inx + 1; } } ; -- cl_exec ('set lock_escalation_pct = 110'); -- cl_exec ('DB.DBA.RDF_LD_SRV (1)') & -- cl_exec ('DB.DBA.RDF_LD_SRV (2)') &