%META:TOPICPARENT{name="VOSArticles"}%
---++Loading data to LOD instance
To load the RDF data to the LOD instance, perform the following steps:
1 Configure & start cluster
1 Execute the file, clrdfinx2.sql
:
--
-- $Id: clrdfinx2.sql,v 1.1.2.5 2009-05-07 08:49:49 mitko Exp $
--
-- Alternate RDF index scheme for cases where G unspecified
--
-- This file is part of the OpenLink Software Virtuoso Open-Source (VOS)
-- project.
--
-- Copyright (C) 1998-2009 OpenLink Software
--
-- This project is free software; you can redistribute it and/or modify it
-- under the terms of the GNU General Public License as published by the
-- Free Software Foundation; only version 2 of the License, dated June 1991.
--
-- This program is distributed in the hope that it will be useful, but
-- WITHOUT ANY WARRANTY; without even the implied warranty of
-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-- General Public License for more details.
--
-- You should have received a copy of the GNU General Public License along
-- with this program; if not, write to the Free Software Foundation, Inc.,
-- 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
--
--
DROP INDEX RDF_QUAD_OGPS;
CHECKPOINT;
CREATE TABLE R2
(
G iri_id_8,
S iri_id_8,
P iri_id_8,
O ANY,
PRIMARY KEY (S, P, O, G)
)
ALTER INDEX R2
ON R2
PARTITION ( S INT (0hexffff00) );
LOG_ENABLE (2);
INSERT INTO R2 (G, S, P, O) SELECT G, S, P, O FROM rdf_quad;
DROP TABLE RDF_QUAD;
ALTER TABLE r2 RENAME RDF_QUAD;
CHECKPOINT;
CREATE BITMAP INDEX RDF_QUAD_OPGS
ON RDF_QUAD (O, P, G, S)
PARTITION ( O VARCHAR (-1, 0hexffff) );
CREATE BITMAP INDEX RDF_QUAD_POGS
ON RDF_QUAD (P, O, G, S)
PARTITION ( O VARCHAR (-1, 0hexffff) );
CREATE BITMAP INDEX RDF_QUAD_GPOS
ON RDF_QUAD (G, P, O, S)
PARTITION ( O VARCHAR (-1, 0hexffff) );
CHECKPOINT;
1 Execute the file:
CREATE TABLE load_list
(
ll_file VARCHAR,
ll_graph VARCHAR,
ll_state INT DEFAULT 0, -- 0 not started, 1 going, 2 done
ll_started DATETIME,
ll_done DATETIME,
ll_host INT,
ll_work_time INTEGER,
ll_error VARCHAR ,
PRIMARY KEY (ll_file)
)
ALTER INDEX load_list
ON load_list
PARTITION (ll_file VARCHAR)
;
CREATE INDEX ll_state
ON load_list
(
ll_state,
ll_file,
ll_graph
)
PARTITION (ll_state int)
;
CREATE TABLE ldlock (id INT PRIMARY KEY)
ALTER INDEX ldlock ON ldlock PARTITION (id INT)
;
INSERT INTO ldlock
VALUES (0);
CREATE PROCEDURE
ld_dir
(
IN path VARCHAR,
IN mask VARCHAR,
IN graph VARCHAR
)
{
DECLARE ls any;
DECLARE inx int;
ls := sys_dirlist (path, 1);
FOR (inx := 0; inx < LENGTH (ls); inx := inx + 1)
{
IF (ls[inx] LIKE mask)
{
SET ISOLATION = 'serializable';
IF
( NOT
( EXISTS
( SELECT 1
FROM DB.DBA.LOAD_LIST
WHERE LL_FILE = path || '/' || ls[inx] FOR UPDATE
)
)
)
{
DECLARE gfile, cgfile, ngraph VARCHAR;
gfile := path || '/' || REPLACE (ls[inx], '.gz', '') || '.graph';
cgfile := path || '/' || regexp_replace (REPLACE (ls[inx], '.gz', ''), '\\-[0-9]+\\.n', '.n') || '.graph';
IF (file_stat (gfile) <> 0)
ngraph := TRIM (file_to_string (gfile), ' \r\n');
ELSE IF (file_stat (cgfile) <> 0)
ngraph := TRIM (file_to_string (cgfile), ' \r\n');
ELSE IF (file_stat (path || '/' || 'global.graph') <> 0)
ngraph := TRIM (file_to_string (path || '/' || 'global.graph'), ' \r\n');
ELSE
ngraph := graph;
IF (ngraph IS NOT NULL)
{
INSERT INTO DB.DBA.LOAD_LIST (ll_file, ll_graph)
VALUES (path || '/' || ls[inx], ngraph);
}
}
COMMIT WORK;
}
}
}
;
CREATE PROCEDURE
ld_dir_all
(
IN path VARCHAR,
IN mask VARCHAR,
IN graph VARCHAR
)
{
DECLARE ls ANY;
DECLARE inx INT;
ls := sys_dirlist (path, 0);
ld_dir (path, mask, graph);
FOR (inx := 0; inx < LENGTH (ls); inx := inx + 1)
{
IF (ls[inx] <> '.' AND ls[inx] <> '..')
{
ld_dir_all (path||'/'||ls[inx], mask, graph);
}
}
}
;
CREATE PROCEDURE
ld_add (IN _fname VARCHAR, IN _graph VARCHAR)
{
--log_message (sprintf ('ld_add: %s, %s', _fname, _graph));
SET ISOLATION = 'serializable';
IF (NOT (EXISTS (SELECT 1 FROM DB.DBA.LOAD_LIST WHERE LL_FILE = _fname FOR UPDATE)))
{
INSERT INTO DB.DBA.LOAD_LIST (LL_FILE, LL_GRAPH) VALUES (_fname, _graph);
}
COMMIT WORK;
}
;
CREATE PROCEDURE ld_ttlp_flags (IN fname VARCHAR)
{
IF (fname LIKE '%/btc-2009%' OR fname LIKE '%.nq%')
RETURN 255 + 512;
RETURN 255;
}
CREATE PROCEDURE
ld_file (IN f VARCHAR, IN graph VARCHAR)
{
DECLARE gzip_name VARCHAR;
DECLARE EXIT HANDLER FOR SQLSTATE '*' {
ROLLBACK WORK;
UPDATE DB.DBA.LOAD_LIST
SET LL_STATE = 2,
LL_DONE = CURDATETIME (),
LL_ERROR = __sql_state || ' ' || __sql_message
WHERE LL_FILE = f;
COMMIT WORK;
LOG_MESSAGE (sprintf (' File %s error %s %s', f, __sql_state, __sql_message));
RETURN;
};
IF (f LIKE '%.grdf' OR f LIKE '%.grdf.gz')
{
load_grdf (f);
}
ELSE IF (f LIKE '%.gz')
{
gzip_name := regexp_replace (f, '\.gz\x24', '');
IF (gzip_name LIKE '%.xml' OR gzip_name LIKE '%.owl' OR gzip_name LIKE '%.rdf')
DB.DBA.RDF_LOAD_RDFXML (gz_file_open (f), graph, graph);
ELSE IF (gzip_name like '%.n4')
TTLP (gz_file_open (f), graph, graph, 512 + 255);
ELSE
TTLP (gz_file_open (f), graph, graph, ld_ttlp_flags (f));
}
ELSE
{
IF (f LIKE '%.xml' OR f LIKE '%.owl' OR f LIKE '%.rdf')
DB.DBA.RDF_LOAD_RDFXML (file_open (f), graph, graph);
ELSE IF (f LIKE '%.n4')
TTLP (file_open (f), graph, graph, 512 + 255);
ELSE
TTLP (file_open (f), graph, graph, ld_ttlp_flags (f));
}
--log_message (sprintf ('loaded %s', f));
}
;
CREATE PROCEDURE
rdf_load_dir (IN path VARCHAR,
IN mask VARCHAR := '%.nt',
IN graph VARCHAR := 'http://dbpedia.org')
{
DELETE FROM DB.DBA.LOAD_LIST WHERE LL_FILE = '##stop';
COMMIT WORK;
ld_dir (path, mask, graph);
rdf_loader_run ();
}
;
CREATE PROCEDURE ld_array ()
{
DECLARE first, last, arr, len, local ANY;
DECLARE cr CURSOR FOR
SELECT TOP 100 LL_FILE, LL_GRAPH
FROM DB.DBA.LOAD_LIST TABLE OPTION (index ll_state)
WHERE LL_STATE = 0
FOR UPDATE;
DECLARE fill INT;
DECLARE f, g VARCHAR;
DECLARE r ANY;
WHENEVER NOT FOUND GOTO DONE;
first := 0;
last := 0;
arr := make_array (100, 'any');
fill := 0;
open cr;
len := 0;
FOR (;;)
{
FETCH cr INTO f, g;
IF (0 = first) first := f;
last := f;
arr[fill] := VECTOR (f, g);
len := len + cast (file_stat (f, 1) AS INT);
fill := fill + 1;
IF (len > 2000000)
GOTO DONE;
}
DONE:
IF (0 = first)
RETURN 0;
IF (1 <> sys_stat ('cl_run_local_only'))
local := sys_stat ('cl_this_host');
UPDATE load_list SET ll_state = 1, ll_started = curdatetime (), LL_HOST = local
WHERE ll_file >= first AND ll_file <= last;
RETURN arr;
}
;
CREATE PROCEDURE
rdf_loader_run (IN max_files INTEGER := NULL, IN log_enable INT := 2)
{
DECLARE sec_delay FLOAT;
DECLARE _f, _graph VARCHAR;
DECLARE arr ANY;
DECLARE xx, inx, tx_mode, ld_mode INT;
ld_mode := log_enable;
if (0 = sys_stat ('cl_run_local_only'))
{
IF (log_enable = 2 AND cl_this_host () = 1)
{
cl_exec ('checkpoint_interval (0)');
cl_exec ('__dbf_set (''cl_non_logged_write_mode'', 1)');
}
IF (cl_this_host () = 1)
cl_exec('__dbf_set(''cl_max_keep_alives_missed'',3000)');
}
tx_mode := bit_and (1, log_enable);
log_message ('Loader started');
DELETE FROM DB.DBA.LOAD_LIST WHERE LL_FILE = '##stop';
COMMIT WORK;
WHILE (1)
{
SET ISOLATION = 'repeatable';
DECLARE EXIT HANDLER FOR SQLSTATE '40001' {
ROLLBACK WORK;
sec_delay := rnd(1000)*0.001;
log_message(sprintf('deadlock in loader, waiting %d milliseconds', CAST (sec_delay * 1000 AS INTEGER)));
DELAY(sec_delay);
GOTO AGAIN;
};
AGAIN:;
IF (EXISTS (SELECT 1 FROM DB.DBA.LOAD_LIST WHERE LL_FILE = '##stop'))
{
log_message ('File load stopped by rdf_load_stop.');
RETURN;
}
log_enable (tx_mode, 1);
IF (max_files IS NOT NULL AND max_files <= 0)
{
COMMIT WORK;
log_message ('Max_files reached. Finishing.');
RETURN;
}
WHENEVER NOT FOUND GOTO looks_empty;
-- log_message ('Getting next file.');
SET ISOLATION = 'serializable';
SELECT id INTO xx FROM ldlock WHERE id = 0 FOR UPDATE;
arr := ld_array ();
COMMIT WORK;
IF (0 = arr)
GOTO looks_empty;
log_enable (ld_mode, 1);
FOR (inx := 0; inx < 100; inx := inx + 1)
{
IF (0 = arr[inx])
GOTO arr_done;
ld_file (arr[inx][0], arr[inx][1]);
UPDATE DB.DBA.LOAD_LIST
SET LL_STATE = 2,
LL_DONE = CURDATETIME ()
WHERE LL_FILE = arr[inx][0];
}
arr_done:
log_enable (tx_mode, 1);
IF (max_files IS NOT NULL) max_files := max_files - 100;
COMMIT WORK;
}
looks_empty:
COMMIT WORK;
log_message ('No more files to load. Loader has finished,');
RETURN;
}
;
CREATE PROCEDURE rdf_load_stop (IN force INT := 0)
{
INSERT INTO DB.DBA.LOAD_LIST (LL_FILE) VALUES ('##stop');
COMMIT WORK;
IF (FORCE)
cl_exec ('txn_killall (1)');
}
;
CREATE PROCEDURE RDF_LOADER_RUN_1 (IN x INT, IN y INT)
{
rdf_loader_run (x, y);
}
;
CREATE PROCEDURE rdf_ld_srv (IN log_enable INT)
{
DECLARE aq ANY;
aq := async_queue (1);
aq_request (aq, 'DB.DBA.RDF_LOADER_RUN_1', VECTOR (NULL, log_enable));
aq_wait_all (aq);
}
;
CREATE PROCEDURE load_grdf (IN f VARCHAR)
{
DECLARE line ANY;
DECLARE inx INT;
DECLARE ses ANY;
DECLARE gr VARCHAR;
IF (f LIKE '%.gz')
ses := gz_file_open (f);
ELSE
ses := file_open (f);
inx := 0;
line := '';
WHILE (line <> 0)
{
gr := ses_read_line (ses, 0, 0, 1);
IF (gr = 0) RETURN;
line := ses_read_line (ses, 0, 0, 1);
IF (line = 0) RETURN;
DB.DBA.RDF_LOAD_RDFXML (line, gr, gr);
inx := inx + 1;
}
}
;
-- cl_exec ('set lock_escalation_pct = 110');
-- cl_exec ('DB.DBA.RDF_LD_SRV (1)') &
-- cl_exec ('DB.DBA.RDF_LD_SRV (2)') &
1 Execute:
cl_exec ('checkpoint);
1 Execute ld_dir ('directory' , 'mask' , 'graph'), for ex:
ld_dir ('/dbs/data', '*.gz', 'http://dbpedia.org');
1 Execute on every node with separate client:
rdf_loader_run();