.\" -*- mode: troff; coding: utf-8 -*- .\" Automatically generated by Pod::Man 5.01 (Pod::Simple 3.43) .\" .\" Standard preamble: .\" ======================================================================== .de Sp \" Vertical space (when we can't use .PP) .if t .sp .5v .if n .sp .. .de Vb \" Begin verbatim text .ft CW .nf .ne \\$1 .. .de Ve \" End verbatim text .ft R .fi .. .\" \*(C` and \*(C' are quotes in nroff, nothing in troff, for use with C<>. .ie n \{\ . ds C` "" . ds C' "" 'br\} .el\{\ . ds C` . ds C' 'br\} .\" .\" Escape single quotes in literal strings from groff's Unicode transform. .ie \n(.g .ds Aq \(aq .el .ds Aq ' .\" .\" If the F register is >0, we'll generate index entries on stderr for .\" titles (.TH), headers (.SH), subsections (.SS), items (.Ip), and index .\" entries marked with X<> in POD. Of course, you'll have to process the .\" output yourself in some meaningful fashion. .\" .\" Avoid warning from groff about undefined register 'F'. .de IX .. .nr rF 0 .if \n(.g .if rF .nr rF 1 .if (\n(rF:(\n(.g==0)) \{\ . if \nF \{\ . de IX . tm Index:\\$1\t\\n%\t"\\$2" .. . if !\nF==2 \{\ . nr % 0 . nr F 2 . \} . \} .\} .rr rF .\" ======================================================================== .\" .IX Title "Minion::Backend::Pg 3pm" .TH Minion::Backend::Pg 3pm 2024-03-24 "perl v5.38.2" "User Contributed Perl Documentation" .\" For nroff, turn off justification. Always turn off hyphenation; it makes .\" way too many mistakes in technical documents. .if n .ad l .nh .SH NAME Minion::Backend::Pg \- PostgreSQL backend .SH SYNOPSIS .IX Header "SYNOPSIS" .Vb 1 \& use Minion::Backend::Pg; \& \& my $backend = Minion::Backend::Pg\->new(\*(Aqpostgresql://postgres@/test\*(Aq); .Ve .SH DESCRIPTION .IX Header "DESCRIPTION" Minion::Backend::Pg is a backend for Minion based on Mojo::Pg. All necessary tables will be created automatically with a set of migrations named \f(CW\*(C`minion\*(C'\fR. Note that this backend uses many bleeding edge features, so only the latest, stable version of PostgreSQL is fully supported. .SH ATTRIBUTES .IX Header "ATTRIBUTES" Minion::Backend::Pg inherits all attributes from Minion::Backend and implements the following new ones. .SS pg .IX Subsection "pg" .Vb 2 \& my $pg = $backend\->pg; \& $backend = $backend\->pg(Mojo::Pg\->new); .Ve .PP Mojo::Pg object used to store all data. .SH METHODS .IX Header "METHODS" Minion::Backend::Pg inherits all methods from Minion::Backend and implements the following new ones. .SS broadcast .IX Subsection "broadcast" .Vb 3 \& my $bool = $backend\->broadcast(\*(Aqsome_command\*(Aq); \& my $bool = $backend\->broadcast(\*(Aqsome_command\*(Aq, [@args]); \& my $bool = $backend\->broadcast(\*(Aqsome_command\*(Aq, [@args], [$id1, $id2, $id3]); .Ve .PP Broadcast remote control command to one or more workers. .SS dequeue .IX Subsection "dequeue" .Vb 2 \& my $job_info = $backend\->dequeue($worker_id, 0.5); \& my $job_info = $backend\->dequeue($worker_id, 0.5, {queues => [\*(Aqimportant\*(Aq]}); .Ve .PP Wait a given amount of time in seconds for a job, dequeue it and transition from \f(CW\*(C`inactive\*(C'\fR to \f(CW\*(C`active\*(C'\fR state, or return \f(CW\*(C`undef\*(C'\fR if queues were empty. .PP These options are currently available: .IP id 2 .IX Item "id" .Vb 1 \& id => \*(Aq10023\*(Aq .Ve .Sp Dequeue a specific job. .IP min_priority 2 .IX Item "min_priority" .Vb 1 \& min_priority => 3 .Ve .Sp Do not dequeue jobs with a lower priority. .IP queues 2 .IX Item "queues" .Vb 1 \& queues => [\*(Aqimportant\*(Aq] .Ve .Sp One or more queues to dequeue jobs from, defaults to \f(CW\*(C`default\*(C'\fR. .PP These fields are currently available: .IP args 2 .IX Item "args" .Vb 1 \& args => [\*(Aqfoo\*(Aq, \*(Aqbar\*(Aq] .Ve .Sp Job arguments. .IP id 2 .IX Item "id" .Vb 1 \& id => \*(Aq10023\*(Aq .Ve .Sp Job ID. .IP retries 2 .IX Item "retries" .Vb 1 \& retries => 3 .Ve .Sp Number of times job has been retried. .IP task 2 .IX Item "task" .Vb 1 \& task => \*(Aqfoo\*(Aq .Ve .Sp Task name. .SS enqueue .IX Subsection "enqueue" .Vb 3 \& my $job_id = $backend\->enqueue(\*(Aqfoo\*(Aq); \& my $job_id = $backend\->enqueue(foo => [@args]); \& my $job_id = $backend\->enqueue(foo => [@args] => {priority => 1}); .Ve .PP Enqueue a new job with \f(CW\*(C`inactive\*(C'\fR state. .PP These options are currently available: .IP attempts 2 .IX Item "attempts" .Vb 1 \& attempts => 25 .Ve .Sp Number of times performing this job will be attempted, with a delay based on "backoff" in Minion after the first attempt, defaults to \f(CW1\fR. .IP delay 2 .IX Item "delay" .Vb 1 \& delay => 10 .Ve .Sp Delay job for this many seconds (from now), defaults to \f(CW0\fR. .IP expire 2 .IX Item "expire" .Vb 1 \& expire => 300 .Ve .Sp Job is valid for this many seconds (from now) before it expires. .IP lax 2 .IX Item "lax" .Vb 1 \& lax => 1 .Ve .Sp Existing jobs this job depends on may also have transitioned to the \f(CW\*(C`failed\*(C'\fR state to allow for it to be processed, defaults to \f(CW\*(C`false\*(C'\fR. Note that this option is \fBEXPERIMENTAL\fR and might change without warning! .IP notes 2 .IX Item "notes" .Vb 1 \& notes => {foo => \*(Aqbar\*(Aq, baz => [1, 2, 3]} .Ve .Sp Hash reference with arbitrary metadata for this job. .IP parents 2 .IX Item "parents" .Vb 1 \& parents => [$id1, $id2, $id3] .Ve .Sp One or more existing jobs this job depends on, and that need to have transitioned to the state \f(CW\*(C`finished\*(C'\fR before it can be processed. .IP priority 2 .IX Item "priority" .Vb 1 \& priority => 5 .Ve .Sp Job priority, defaults to \f(CW0\fR. Jobs with a higher priority get performed first. Priorities can be positive or negative, but should be in the range between \f(CW100\fR and \f(CW\-100\fR. .IP queue 2 .IX Item "queue" .Vb 1 \& queue => \*(Aqimportant\*(Aq .Ve .Sp Queue to put job in, defaults to \f(CW\*(C`default\*(C'\fR. .SS fail_job .IX Subsection "fail_job" .Vb 4 \& my $bool = $backend\->fail_job($job_id, $retries); \& my $bool = $backend\->fail_job($job_id, $retries, \*(AqSomething went wrong!\*(Aq); \& my $bool = $backend\->fail_job( \& $job_id, $retries, {whatever => \*(AqSomething went wrong!\*(Aq}); .Ve .PP Transition from \f(CW\*(C`active\*(C'\fR to \f(CW\*(C`failed\*(C'\fR state with or without a result, and if there are attempts remaining, transition back to \f(CW\*(C`inactive\*(C'\fR with a delay based on "backoff" in Minion. .SS finish_job .IX Subsection "finish_job" .Vb 4 \& my $bool = $backend\->finish_job($job_id, $retries); \& my $bool = $backend\->finish_job($job_id, $retries, \*(AqAll went well!\*(Aq); \& my $bool = $backend\->finish_job( \& $job_id, $retries, {whatever => \*(AqAll went well!\*(Aq}); .Ve .PP Transition from \f(CW\*(C`active\*(C'\fR to \f(CW\*(C`finished\*(C'\fR state with or without a result. .SS history .IX Subsection "history" .Vb 1 \& my $history = $backend\->history; .Ve .PP Get history information for job queue. .PP These fields are currently available: .IP daily 2 .IX Item "daily" .Vb 1 \& daily => [{epoch => 12345, finished_jobs => 95, failed_jobs => 2}, ...] .Ve .Sp Hourly counts for processed jobs from the past day. .SS list_jobs .IX Subsection "list_jobs" .Vb 2 \& my $results = $backend\->list_jobs($offset, $limit); \& my $results = $backend\->list_jobs($offset, $limit, {states => [\*(Aqinactive\*(Aq]}); .Ve .PP Returns the information about jobs in batches. .PP .Vb 2 \& # Get the total number of results (without limit) \& my $num = $backend\->list_jobs(0, 100, {queues => [\*(Aqimportant\*(Aq]})\->{total}; \& \& # Check job state \& my $results = $backend\->list_jobs(0, 1, {ids => [$job_id]}); \& my $state = $results\->{jobs}[0]{state}; \& \& # Get job result \& my $results = $backend\->list_jobs(0, 1, {ids => [$job_id]}); \& my $result = $results\->{jobs}[0]{result}; .Ve .PP These options are currently available: .IP before 2 .IX Item "before" .Vb 1 \& before => 23 .Ve .Sp List only jobs before this id. .IP ids 2 .IX Item "ids" .Vb 1 \& ids => [\*(Aq23\*(Aq, \*(Aq24\*(Aq] .Ve .Sp List only jobs with these ids. .IP notes 2 .IX Item "notes" .Vb 1 \& notes => [\*(Aqfoo\*(Aq, \*(Aqbar\*(Aq] .Ve .Sp List only jobs with one of these notes. .IP queues 2 .IX Item "queues" .Vb 1 \& queues => [\*(Aqimportant\*(Aq, \*(Aqunimportant\*(Aq] .Ve .Sp List only jobs in these queues. .IP states 2 .IX Item "states" .Vb 1 \& states => [\*(Aqinactive\*(Aq, \*(Aqactive\*(Aq] .Ve .Sp List only jobs in these states. .IP tasks 2 .IX Item "tasks" .Vb 1 \& tasks => [\*(Aqfoo\*(Aq, \*(Aqbar\*(Aq] .Ve .Sp List only jobs for these tasks. .PP These fields are currently available: .IP args 2 .IX Item "args" .Vb 1 \& args => [\*(Aqfoo\*(Aq, \*(Aqbar\*(Aq] .Ve .Sp Job arguments. .IP attempts 2 .IX Item "attempts" .Vb 1 \& attempts => 25 .Ve .Sp Number of times performing this job will be attempted. .IP children 2 .IX Item "children" .Vb 1 \& children => [\*(Aq10026\*(Aq, \*(Aq10027\*(Aq, \*(Aq10028\*(Aq] .Ve .Sp Jobs depending on this job. .IP created 2 .IX Item "created" .Vb 1 \& created => 784111777 .Ve .Sp Epoch time job was created. .IP delayed 2 .IX Item "delayed" .Vb 1 \& delayed => 784111777 .Ve .Sp Epoch time job was delayed to. .IP expires 2 .IX Item "expires" .Vb 1 \& expires => 784111777 .Ve .Sp Epoch time job is valid until before it expires. .IP finished 2 .IX Item "finished" .Vb 1 \& finished => 784111777 .Ve .Sp Epoch time job was finished. .IP id 2 .IX Item "id" .Vb 1 \& id => 10025 .Ve .Sp Job id. .IP lax 2 .IX Item "lax" .Vb 1 \& lax => 0 .Ve .Sp Existing jobs this job depends on may also have failed to allow for it to be processed. .IP notes 2 .IX Item "notes" .Vb 1 \& notes => {foo => \*(Aqbar\*(Aq, baz => [1, 2, 3]} .Ve .Sp Hash reference with arbitrary metadata for this job. .IP parents 2 .IX Item "parents" .Vb 1 \& parents => [\*(Aq10023\*(Aq, \*(Aq10024\*(Aq, \*(Aq10025\*(Aq] .Ve .Sp Jobs this job depends on. .IP priority 2 .IX Item "priority" .Vb 1 \& priority => 3 .Ve .Sp Job priority. .IP queue 2 .IX Item "queue" .Vb 1 \& queue => \*(Aqimportant\*(Aq .Ve .Sp Queue name. .IP result 2 .IX Item "result" .Vb 1 \& result => \*(AqAll went well!\*(Aq .Ve .Sp Job result. .IP retried 2 .IX Item "retried" .Vb 1 \& retried => 784111777 .Ve .Sp Epoch time job has been retried. .IP retries 2 .IX Item "retries" .Vb 1 \& retries => 3 .Ve .Sp Number of times job has been retried. .IP started 2 .IX Item "started" .Vb 1 \& started => 784111777 .Ve .Sp Epoch time job was started. .IP state 2 .IX Item "state" .Vb 1 \& state => \*(Aqinactive\*(Aq .Ve .Sp Current job state, usually \f(CW\*(C`active\*(C'\fR, \f(CW\*(C`failed\*(C'\fR, \f(CW\*(C`finished\*(C'\fR or \f(CW\*(C`inactive\*(C'\fR. .IP task 2 .IX Item "task" .Vb 1 \& task => \*(Aqfoo\*(Aq .Ve .Sp Task name. .IP time 2 .IX Item "time" .Vb 1 \& time => 78411177 .Ve .Sp Server time. .IP worker 2 .IX Item "worker" .Vb 1 \& worker => \*(Aq154\*(Aq .Ve .Sp Id of worker that is processing the job. .SS list_locks .IX Subsection "list_locks" .Vb 2 \& my $results = $backend\->list_locks($offset, $limit); \& my $results = $backend\->list_locks($offset, $limit, {names => [\*(Aqfoo\*(Aq]}); .Ve .PP Returns information about locks in batches. .PP .Vb 2 \& # Get the total number of results (without limit) \& my $num = $backend\->list_locks(0, 100, {names => [\*(Aqbar\*(Aq]})\->{total}; \& \& # Check expiration time \& my $results = $backend\->list_locks(0, 1, {names => [\*(Aqfoo\*(Aq]}); \& my $expires = $results\->{locks}[0]{expires}; .Ve .PP These options are currently available: .IP names 2 .IX Item "names" .Vb 1 \& names => [\*(Aqfoo\*(Aq, \*(Aqbar\*(Aq] .Ve .Sp List only locks with these names. .PP These fields are currently available: .IP expires 2 .IX Item "expires" .Vb 1 \& expires => 784111777 .Ve .Sp Epoch time this lock will expire. .IP id 2 .IX Item "id" .Vb 1 \& id => 1 .Ve .Sp Lock id. .IP name 2 .IX Item "name" .Vb 1 \& name => \*(Aqfoo\*(Aq .Ve .Sp Lock name. .SS list_workers .IX Subsection "list_workers" .Vb 2 \& my $results = $backend\->list_workers($offset, $limit); \& my $results = $backend\->list_workers($offset, $limit, {ids => [23]}); .Ve .PP Returns information about workers in batches. .PP .Vb 2 \& # Get the total number of results (without limit) \& my $num = $backend\->list_workers(0, 100)\->{total}; \& \& # Check worker host \& my $results = $backend\->list_workers(0, 1, {ids => [$worker_id]}); \& my $host = $results\->{workers}[0]{host}; .Ve .PP These options are currently available: .IP before 2 .IX Item "before" .Vb 1 \& before => 23 .Ve .Sp List only workers before this id. .IP ids 2 .IX Item "ids" .Vb 1 \& ids => [\*(Aq23\*(Aq, \*(Aq24\*(Aq] .Ve .Sp List only workers with these ids. .PP These fields are currently available: .IP id 2 .IX Item "id" .Vb 1 \& id => 22 .Ve .Sp Worker id. .IP host 2 .IX Item "host" .Vb 1 \& host => \*(Aqlocalhost\*(Aq .Ve .Sp Worker host. .IP jobs 2 .IX Item "jobs" .Vb 1 \& jobs => [\*(Aq10023\*(Aq, \*(Aq10024\*(Aq, \*(Aq10025\*(Aq, \*(Aq10029\*(Aq] .Ve .Sp Ids of jobs the worker is currently processing. .IP notified 2 .IX Item "notified" .Vb 1 \& notified => 784111777 .Ve .Sp Epoch time worker sent the last heartbeat. .IP pid 2 .IX Item "pid" .Vb 1 \& pid => 12345 .Ve .Sp Process id of worker. .IP started 2 .IX Item "started" .Vb 1 \& started => 784111777 .Ve .Sp Epoch time worker was started. .IP status 2 .IX Item "status" .Vb 1 \& status => {queues => [\*(Aqdefault\*(Aq, \*(Aqimportant\*(Aq]} .Ve .Sp Hash reference with whatever status information the worker would like to share. .SS lock .IX Subsection "lock" .Vb 2 \& my $bool = $backend\->lock(\*(Aqfoo\*(Aq, 3600); \& my $bool = $backend\->lock(\*(Aqfoo\*(Aq, 3600, {limit => 20}); .Ve .PP Try to acquire a named lock that will expire automatically after the given amount of time in seconds. An expiration time of \f(CW0\fR can be used to check if a named lock already exists without creating one. .PP These options are currently available: .IP limit 2 .IX Item "limit" .Vb 1 \& limit => 20 .Ve .Sp Number of shared locks with the same name that can be active at the same time, defaults to \f(CW1\fR. .SS new .IX Subsection "new" .Vb 2 \& my $backend = Minion::Backend::Pg\->new(\*(Aqpostgresql://postgres@/test\*(Aq); \& my $backend = Minion::Backend::Pg\->new(Mojo::Pg\->new); .Ve .PP Construct a new Minion::Backend::Pg object. .SS note .IX Subsection "note" .Vb 1 \& my $bool = $backend\->note($job_id, {mojo => \*(Aqrocks\*(Aq, minion => \*(Aqtoo\*(Aq}); .Ve .PP Change one or more metadata fields for a job. Setting a value to \f(CW\*(C`undef\*(C'\fR will remove the field. .SS receive .IX Subsection "receive" .Vb 1 \& my $commands = $backend\->receive($worker_id); .Ve .PP Receive remote control commands for worker. .SS register_worker .IX Subsection "register_worker" .Vb 4 \& my $worker_id = $backend\->register_worker; \& my $worker_id = $backend\->register_worker($worker_id); \& my $worker_id = $backend\->register_worker( \& $worker_id, {status => {queues => [\*(Aqdefault\*(Aq, \*(Aqimportant\*(Aq]}}); .Ve .PP Register worker or send heartbeat to show that this worker is still alive. .PP These options are currently available: .IP status 2 .IX Item "status" .Vb 1 \& status => {queues => [\*(Aqdefault\*(Aq, \*(Aqimportant\*(Aq]} .Ve .Sp Hash reference with whatever status information the worker would like to share. .SS remove_job .IX Subsection "remove_job" .Vb 1 \& my $bool = $backend\->remove_job($job_id); .Ve .PP Remove \f(CW\*(C`failed\*(C'\fR, \f(CW\*(C`finished\*(C'\fR or \f(CW\*(C`inactive\*(C'\fR job from queue. .SS repair .IX Subsection "repair" .Vb 1 \& $backend\->repair; .Ve .PP Repair worker registry and job queue if necessary. .SS reset .IX Subsection "reset" .Vb 1 \& $backend\->reset({all => 1}); .Ve .PP Reset job queue. .PP These options are currently available: .IP all 2 .IX Item "all" .Vb 1 \& all => 1 .Ve .Sp Reset everything. .IP locks 2 .IX Item "locks" .Vb 1 \& locks => 1 .Ve .Sp Reset only locks. .SS retry_job .IX Subsection "retry_job" .Vb 2 \& my $bool = $backend\->retry_job($job_id, $retries); \& my $bool = $backend\->retry_job($job_id, $retries, {delay => 10}); .Ve .PP Transition job back to \f(CW\*(C`inactive\*(C'\fR state, already \f(CW\*(C`inactive\*(C'\fR jobs may also be retried to change options. .PP These options are currently available: .IP attempts 2 .IX Item "attempts" .Vb 1 \& attempts => 25 .Ve .Sp Number of times performing this job will be attempted. .IP delay 2 .IX Item "delay" .Vb 1 \& delay => 10 .Ve .Sp Delay job for this many seconds (from now), defaults to \f(CW0\fR. .IP expire 2 .IX Item "expire" .Vb 1 \& expire => 300 .Ve .Sp Job is valid for this many seconds (from now) before it expires. .IP lax 2 .IX Item "lax" .Vb 1 \& lax => 1 .Ve .Sp Existing jobs this job depends on may also have transitioned to the \f(CW\*(C`failed\*(C'\fR state to allow for it to be processed, defaults to \f(CW\*(C`false\*(C'\fR. Note that this option is \fBEXPERIMENTAL\fR and might change without warning! .IP parents 2 .IX Item "parents" .Vb 1 \& parents => [$id1, $id2, $id3] .Ve .Sp Jobs this job depends on. .IP priority 2 .IX Item "priority" .Vb 1 \& priority => 5 .Ve .Sp Job priority. .IP queue 2 .IX Item "queue" .Vb 1 \& queue => \*(Aqimportant\*(Aq .Ve .Sp Queue to put job in. .SS stats .IX Subsection "stats" .Vb 1 \& my $stats = $backend\->stats; .Ve .PP Get statistics for the job queue. .PP These fields are currently available: .IP active_jobs 2 .IX Item "active_jobs" .Vb 1 \& active_jobs => 100 .Ve .Sp Number of jobs in \f(CW\*(C`active\*(C'\fR state. .IP active_locks 2 .IX Item "active_locks" .Vb 1 \& active_locks => 100 .Ve .Sp Number of active named locks. .IP active_workers 2 .IX Item "active_workers" .Vb 1 \& active_workers => 100 .Ve .Sp Number of workers that are currently processing a job. .IP delayed_jobs 2 .IX Item "delayed_jobs" .Vb 1 \& delayed_jobs => 100 .Ve .Sp Number of jobs in \f(CW\*(C`inactive\*(C'\fR state that are scheduled to run at specific time in the future. .IP enqueued_jobs 2 .IX Item "enqueued_jobs" .Vb 1 \& enqueued_jobs => 100000 .Ve .Sp Rough estimate of how many jobs have ever been enqueued. .IP failed_jobs 2 .IX Item "failed_jobs" .Vb 1 \& failed_jobs => 100 .Ve .Sp Number of jobs in \f(CW\*(C`failed\*(C'\fR state. .IP finished_jobs 2 .IX Item "finished_jobs" .Vb 1 \& finished_jobs => 100 .Ve .Sp Number of jobs in \f(CW\*(C`finished\*(C'\fR state. .IP inactive_jobs 2 .IX Item "inactive_jobs" .Vb 1 \& inactive_jobs => 100 .Ve .Sp Number of jobs in \f(CW\*(C`inactive\*(C'\fR state. .IP inactive_workers 2 .IX Item "inactive_workers" .Vb 1 \& inactive_workers => 100 .Ve .Sp Number of workers that are currently not processing a job. .IP uptime 2 .IX Item "uptime" .Vb 1 \& uptime => 1000 .Ve .Sp Uptime in seconds. .IP workers 2 .IX Item "workers" .Vb 1 \& workers => 200; .Ve .Sp Number of registered workers. .SS unlock .IX Subsection "unlock" .Vb 1 \& my $bool = $backend\->unlock(\*(Aqfoo\*(Aq); .Ve .PP Release a named lock. .SS unregister_worker .IX Subsection "unregister_worker" .Vb 1 \& $backend\->unregister_worker($worker_id); .Ve .PP Unregister worker. .SH "SEE ALSO" .IX Header "SEE ALSO" Minion, Minion::Guide, , Mojolicious::Guides, .