[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

SF.net SVN: ledger-smb: [1975] trunk



Revision: 1975
          http://ledger-smb.svn.sourceforge.net/ledger-smb/?rev=1975&view=rev
Author:   einhverfr
Date:     2007-12-12 14:27:12 -0800 (Wed, 12 Dec 2007)

Log Message:
-----------
Basic outline of Job Queue System

Modified Paths:
--------------
    trunk/LedgerSMB/DBObject/Payment.pm
    trunk/sql/Pg-database.sql
    trunk/sql/modules/Payment.sql
    trunk/sql/modules/Roles.sql
    trunk/sql/modules/Settings.sql

Added Paths:
-----------
    trunk/utils/process_queue/
    trunk/utils/process_queue/config.pl
    trunk/utils/process_queue/process_queue.pl

Modified: trunk/LedgerSMB/DBObject/Payment.pm
===================================================================
--- trunk/LedgerSMB/DBObject/Payment.pm	2007-12-11 23:15:31 UTC (rev 1974)
+++ trunk/LedgerSMB/DBObject/Payment.pm	2007-12-12 22:27:12 UTC (rev 1975)
@@ -385,6 +385,11 @@
 sub post_bulk {
     my ($self) = @_;
     my $total_count = 0;
+    my ($ref) = $self->callproc(
+          procname => 'setting_get', 
+          args     => ['queue_payments'],
+    );
+    my $queue_payments = $ref->{setting_get};
     $self->{payment_date} = $self->{datepaid};
     for my $contact_row (1 .. $self->{contact_count}){
         my $contact_id = $self->{"contact_$contact_row"};
@@ -408,10 +413,20 @@
         }
         $self->{transactions} = $invoice_array;
 	$self->{source} = $self->{"source_$contact_id"};
-        $self->exec_method(funcname => 'payment_bulk_post');
-
+        if ($queue_payments){
+             my ($job_ref) = $self->exec_method(
+                 funcname => 'job__create'
+             )
+             $self->{job_id} = $job_ref->{job__create};
+             $self->exec_method(
+                 funcname => 'payment_bulk_queue_entry'
+             );
+        } else {
+            $self->exec_method(funcname => 'payment_bulk_post');
+        }
     }
-    $self->{dbh}->commit;
+    $self->{queue_payments} = $queue_payments;
+    return $self->{dbh}->commit;
 }
 
 1;

Modified: trunk/sql/Pg-database.sql
===================================================================
--- trunk/sql/Pg-database.sql	2007-12-11 23:15:31 UTC (rev 1974)
+++ trunk/sql/Pg-database.sql	2007-12-12 22:27:12 UTC (rev 1975)
@@ -332,13 +332,7 @@
   setting_key text primary key,
   value text
 );
-/*
-  inventory_accno_id int,
-  income_accno_id int,
-  expense_accno_id int,
-  fxgain_accno_id int,
-  fxloss_accno_id int,
-*/
+
 \COPY defaults FROM stdin WITH DELIMITER |
 sinumber|1
 sonumber|1
@@ -358,7 +352,14 @@
 vendornumber|1
 glnumber|1
 projectnumber|1
+queue_payments|0
+poll_frequency|1
 \.
+
+COMMENT ON TABLE defaults IS $$
+Note that poll_frequency is in seconds.  poll_frequency and queue_payments 
+are not exposed via the admin interface as they are advanced features best
+handled via DBAs.  $$;
 -- */
 CREATE TABLE acc_trans (
   trans_id int NOT NULL REFERENCES transactions(id),
@@ -2695,4 +2696,59 @@
 CREATE INDEX location_city_prov_gist_idx ON location USING gist(city gist_trgm_ops);
 CREATE INDEX entity_name_gist_idx ON entity USING gist(name gist_trgm_ops);
 
+CREATE TABLE pending_job (
+	id serial not null unique,
+	batch_class int references batch_class(id),
+	entered_by text REFERENCES users(username)
+		not null default SESSION_USER,
+	entered_at timestamp default now(),
+	batch_id int references batch(id),
+	completed_at timestamp,
+	success bool,
+	error_condition text,
+	CHECK (completed_at IS NULL OR success IS NOT NULL),
+	CHECK (success IS NOT FALSE OR error_condition IS NOT NULL)
+);
+COMMENT ON table pending_job IS
+$$ Purpose:  This table stores pending/queued jobs to be processed async.
+Additionally, this functions as a log of all such processing for purposes of 
+internal audits, performance tuning, and the like. $$;
+
+CREATE INDEX pending_job_batch_id_pending ON pending_job(batch_id) where success IS NULL;
+
+CREATE INDEX pending_job_entered_by ON pending_job(entered_by);
+
+CREATE OR REPLACE FUNCTION trigger_pending_job() RETURNS TRIGGER
+AS
+$$
+BEGIN
+  IF NEW.success IS NULL THEN
+    NOTIFY job_entered;
+  END IF;
+  RETURN NEW;
+END;
+$$ LANGUAGE PLPGSQL;
+
+CREATE TRIGGER notify_pending_jobs BEFORE INSERT OR UPDATE ON pending_job
+FOR EACH ROW EXECUTE PROCEDURE trigger_pending_job();
+
+CREATE TABLE payments_queue (
+	transactions numeric[], 
+	batch_id int, 
+	source text, 
+	total numeric,
+	ar_ap_accno text, 
+	cash_accno text, 
+	payment_date date, 
+	account_class int,
+	job_id int references pending_job(id) 
+		DEFAULT currval('pending_job_id_seq')
+);
+
+CREATE INDEX payments_queue_job_id ON payments_queue(job_id);
+
+COMMENT ON table payments_queue IS 
+$$ This is a holding table and hence not a candidate for normalization.
+Jobs should be deleted from this table when they complete successfully.$$;
+
 commit;

Modified: trunk/sql/modules/Payment.sql
===================================================================
--- trunk/sql/modules/Payment.sql	2007-12-11 23:15:31 UTC (rev 1974)
+++ trunk/sql/modules/Payment.sql	2007-12-12 22:27:12 UTC (rev 1975)
@@ -207,6 +207,88 @@
 cnsisting of outstanding invoices.
 $$;
 
+CREATE OR REPLACE FUNCTION payment_create_queue_entry() RETURNS int AS
+$$
+$$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION payment_bulk_queue
+(in_transactions numeric[], in_batch_id int, in_source text, in_total numeric,
+	in_ar_ap_accno text, in_cash_accno text, 
+	in_payment_date date, in_account_class int)
+returns int as
+$$
+BEGIN
+	INSERT INTO payments_queue
+	(transactions, batch_id, source, total, ar_ap_accno, cash_accno,
+		payment_date, account_class)
+	VALUES 
+	(in_transactions, in_batch_id, in_source, in_total, in_ar_ap_accno,
+		in_cash_accno, in_payment_date, in_account_class);
+
+	RETURN array_upper(in_transactions, 1) - 
+		array_lower(in_transactions, 1);
+END;
+$$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION job__process_payments(in_job_id int)
+RETURNS bool AS $$
+DECLARE 
+	queue_record RECORD
+	t_auth_name text,
+BEGIN
+	-- TODO:  Move the set session authorization into a utility function
+	SELECT created_by INTO t_auth_name FROM pending_jobs
+	WHERE id = in_job_id;
+
+	EXECUTE 'SET SESSION AUTHORIZATION ' quote_ident(t_auth_name);
+
+	FOR queue_record IN
+		SELECT * from payments_queue WHERE job_id = in_job_id
+	LOOP
+		PERFORM payment_bulk_post
+		(transactions, batch_id, source, total, ar_ap_accno, cash_accno,
+			payment_date, account_class);
+	END LOOP;
+		UPDATE pending_job
+		SET completed_at = timeofday()::timestamp,
+		    success = true
+		WHERE id = in_job_id;
+	RETURN TRUE;
+END;
+$$ language plpgsql;
+
+CREATE OR REPLACE FUNCTION job__create(in_batch_class int, in_batch_id int)
+RETURNS int AS
+$$
+BEGIN
+	INSERT INTO pending_job (batch_class, batch_id)
+	VALUES (in_batch_class, in_batch_id);
+
+	RETURN currval('pending_job_id_seq');
+END;
+$$ LANGUAGE PLPGSQL;
+
+CREATE TYPE job__status AS (
+	completed int, -- 1 for completed, 0 for no
+	success int, -- 1 for success, 0 for no
+	completed_at timestamp,
+	error_condition text -- error if not successful
+);
+
+CREATE OR REPLACE FUNCTION job__status(in_job_id int) RETURNS job__status AS
+$$
+DECLARE out_row job__status;
+BEGIN
+	SELECT  (completed_at IS NULL)::INT, success::int, completed_at,
+		error_condition
+	INTO out_row 
+	FROM pending_job
+	WHERE id = in_job_id;
+
+	RETURN out_row;
+END;
+$$ language plpgsql;
+
 CREATE OR REPLACE FUNCTION payment_bulk_post
 (in_transactions numeric[], in_batch_id int, in_source text, in_total numeric,
 	in_ar_ap_accno text, in_cash_accno text, 
@@ -265,7 +347,6 @@
 				CASE WHEN t_voucher_id IS NULL THEN true
 				ELSE false END,
 				t_voucher_id, in_payment_date);
-		insert into test_pay(id, amount) values (in_transactions[out_count][1],in_transactions[out_count][2]);	
 		UPDATE ap 
 		set paid = paid +in_transactions[out_count][2]
 		where id =in_transactions[out_count][1];
@@ -449,21 +530,37 @@
 COMMENT ON FUNCTION payment_get_vc_info(in_entity_id int) IS
 $$ This function return vendor or customer info, its under construction $$;
 
+CREATE TYPE payment_record AS (
+	amount numeric,
+	meta_number text,
+	company_paid text,
+        cash_account_id int,
+        cash_accno text,
+        cash_account_description text,
+        ar_ap_account_id int,
+        ar_ap_accno text,
+        ar_ap_description text
+);
+
 CREATE OR REPLACE FUNCTION payment__retrieve
 (in_source text, in_meta_number text, in_account_class int, in_cash_accno text)
-RETURNS SETOF numeric AS
+RETURNS SETOF payment_record AS
 $$
-DECLARE out_row RECORD;
+DECLARE out_row payment_record;
 BEGIN
 	FOR out_row IN 
-		SELECT amount * -1 AS amount
-		FROM acc_trans
-		WHERE source = in_source
-			AND trans_id IN (
-				SELECT id FROM ar 
-				WHERE in_account_class = 2 AND
-					entity_credit_account = 
-						(select id 
+		SELECT sum(case when at.amount > 0 then at.amount else 0 end) 
+				AS amount, ec.meta_number, 
+			c.legal_name, max(cc.id), max(cc.accno), 
+			max(cc.description), max(ac.id), max(ac.accno), 
+			max(ac.description)
+		FROM acc_trans at
+		JOIN entity_credit_account ec ON
+			(at.trans_id IN 
+				(select id FROM ar 
+				WHERE in_account_class = 2
+					AND entity_credit_account =
+						(SELECT id 
 						FROM entity_credit_account
 						WHERE meta_number 
 							= in_meta_number
@@ -478,18 +575,31 @@
 						WHERE meta_number 
 							= in_meta_number
 							AND entity_class = 
-							in_account_class)
-			AND chart_id = 
-				(SELECT id FROM chart 
-				WHERE accno = in_cash_accno)
+							in_account_class)))
+				
+		JOIN company c ON (ec.entity_id = c.entity_id)
+		LEFT JOIN chart cc ON (at.chart_id = cc.id AND
+			cc.link LIKE '%paid%')
+		JOIN chart ac ON (at.chart_id = ac.id AND
+			((in_account_class = 1 AND ac.link = 'AP') OR
+			 (in_account_class = 2 AND ac.link = 'AR')))
+		WHERE source = in_source
+		GROUP BY ec.meta_number, c.legal_name
+		HAVING max(cc.accno) = in_cash_accno
 	LOOP
-		return next out_row.amount;
+		return next out_row;
 	END LOOP;	
 END;
 $$ LANGUAGE plpgsql;
+
 CREATE OR REPLACE FUNCTION payment__reverse
 (in_source text, in_date_paid date, in_credit_id int, in_cash_accno text)
 RETURNS INT 
 AS $$
-	
+DECLARE
+    count int;
+BEGIN
+    count := 0;
+    FOR 
+END;
 $$ LANGUAGE PLPGSQL;

Modified: trunk/sql/modules/Roles.sql
===================================================================
--- trunk/sql/modules/Roles.sql	2007-12-11 23:15:31 UTC (rev 1974)
+++ trunk/sql/modules/Roles.sql	2007-12-12 22:27:12 UTC (rev 1975)
@@ -1387,7 +1387,9 @@
 GRANT SELECT ON business, exchangerate, department, shipto, tax TO public;
 GRANT ALL ON recurring, recurringemail, recurringprint, status TO public; 
 GRANT ALL ON transactions, entity_employee, customer, vendor TO public;
---TODO, lock recurring down more
+GRANT ALL ON pending_job, payment_queue TO PUBLIC;
+GRANT ALL ON pending_job_id_seq TO public;
+--TODO, lock recurring, pending_job, payment_queue down more
 
 -- CT:  The following grant is required for now, but will hopefully become less 
 -- important when we get to 1.4 and can more sensibly lock things down.

Modified: trunk/sql/modules/Settings.sql
===================================================================
--- trunk/sql/modules/Settings.sql	2007-12-11 23:15:31 UTC (rev 1974)
+++ trunk/sql/modules/Settings.sql	2007-12-12 22:27:12 UTC (rev 1975)
@@ -15,7 +15,7 @@
 	out_value varchar;
 BEGIN
 	SELECT value INTO out_value FROM defaults WHERE setting_key = in_key;
-	RETURN value;
+	RETURN out_value;
 END;
 $$ LANGUAGE plpgsql;
 

Added: trunk/utils/process_queue/config.pl
===================================================================
--- trunk/utils/process_queue/config.pl	                        (rev 0)
+++ trunk/utils/process_queue/config.pl	2007-12-12 22:27:12 UTC (rev 1975)
@@ -0,0 +1,18 @@
+#!/usr/bin/perl
+
+use vars qw($database $db_user
+  $db_passwd);
+
+# The databases containing LedgerSMB
+our $database = ("ledgersmb");
+
+# The user to connect with.  This must be a superuser so that set session auth
+# works as expected
+
+our $db_user = "postgres";
+
+# The password for the db user:
+our $db_passwd = "mypasswd";
+
+1;
+

Added: trunk/utils/process_queue/process_queue.pl
===================================================================
--- trunk/utils/process_queue/process_queue.pl	                        (rev 0)
+++ trunk/utils/process_queue/process_queue.pl	2007-12-12 22:27:12 UTC (rev 1975)
@@ -0,0 +1,89 @@
+#!/usr/bin/perl
+
+# TODO:  Add POD -CT
+
+require "config.pl";
+
+use DBI;
+# TODO:  Convert config.pl to namespace so we can use strict.
+
+my $cycle_delay;
+
+my $dbh = db_init();
+
+# Basic db connection setup routines
+
+
+
+my $sth;
+
+$dbh->do("LISTEN job_entered");
+while (1) {    # loop infinitely
+    if ( $dbh->func('pg_notifies') ) {
+        &on_notify;
+    }
+    sleep $cycle_delay;
+}
+
+sub on_notify {
+    my $job_id = 1;
+    while ($job_id){
+        ($job_id) = $dbh->selectrow_array( 
+		"SELECT min(id) from pending_job
+		WHERE completed_at IS NULL
+                FOR UPDATE" 
+    	);
+    	if ($job_id){
+            $job_id = $dbh->quote($job_id);
+            my ($job_class) = $dbh->selectrow_array(
+		"select class from batch_class where id = 
+			(select batch_class from pending_job where id = $job_id"
+            );
+            # Right now, we assume that every pending job has a batch id.
+            # Longer-run we may need to use a template handle as well. -CT
+            $dbh->execute('SELECT ' .
+               $dbh->quote_identifier("job__process_$job_class") . "($job_id)"
+            );
+            my $errstr = $dbh->errstr;
+            if (!$dbh->commit){ # Note error and clean up
+                 # Note that this does not clean up the queue holding tables.
+                 # This is a feature, not a bug, as it allows someone to review
+                 # the actual data and then delete if required separately -CT
+                 $dbh->do(
+                      "UPDATE pending_job
+                      SET completed_at = now(),
+                          success = false,
+                          error_condition = " . $dbh->quote($errstr) . "
+                      WHERE id = $job_id"
+                 );
+                 $dbh->commit;
+            }
+            # The line below is necessary because the job process functions
+            # use set session authorization so one must reconnect to reset
+            # administrative permissions. -CT
+            $dbh = db_init(); 
+        }
+    }
+}
+
+sub db_init {
+    my $dsn = "dbi:Pg:dbname=$database";
+    my $dbh = DBI->connect(
+        $dsn, $db_user,
+        $db_passwd,
+        {
+            AutoCommit => 0,
+            PrintError => 0,
+            RaiseError => 1,
+        }
+    );
+    $dbh->{pg_enable_utf8} = 1;
+    ($cycle_delay) = $dbh->selectrow_array(
+		"SELECT value FROM defaults 
+		WHERE setting_key = 'poll_frequency'"
+    );
+    if (!$cycle_delay){
+        die "No Polling Frequency Set Up!";
+    }
+    return $dbh;
+}


This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.