Lesen Sie nicht blockierend von mehreren Fifos parallel

1114
Ole Tange

Ich sitze manchmal mit einer Reihe von Ausgabefifos aus parallel laufenden Programmen. Ich möchte diese Fifos zusammenführen. Die naive Lösung ist:

cat fifo* > output 

Dies erfordert jedoch, dass der erste FIFO abgeschlossen ist, bevor das erste Byte vom zweiten FIFO gelesen wird. Dadurch werden die parallel laufenden Programme blockiert.

Ein anderer Weg ist:

(cat fifo1 & cat fifo2 & ... ) > output 

Dies kann jedoch die Ausgabe mischen, so dass die Ausgabe nur halbe Zeilen beträgt.

Beim Lesen von mehreren FIFOs müssen einige Regeln für das Zusammenführen der Dateien gelten. Normalerweise reicht es für mich aus, Zeile für Zeile zu machen, also suche ich nach etwas, das es tut:

parallel_non_blocking_cat fifo* > output 

Dies wird parallel von allen FIFOs lesen und die Ausgabe mit einer vollen Zeile auf einmal zusammenführen.

Ich kann sehen, dass es nicht schwer ist, dieses Programm zu schreiben. Alles was Sie tun müssen, ist:

  1. Öffne alle Fifos
  2. Führen Sie eine Blockierung für alle aus
  3. Nichtblockierung aus dem FIFO, das Daten enthält, in den Puffer für diesen FIFO lesen
  4. Wenn der Puffer eine vollständige Zeile (oder einen Datensatz) enthält, drucken Sie die Zeile aus
  5. wenn alle fifos geschlossen sind / eof: exit
  6. gehe zu 2

Meine Frage ist also nicht : Kann das gemacht werden?

Meine Frage ist: Ist es schon fertig und kann ich einfach ein Tool installieren, das dies tut?

6

3 Antworten auf die Frage

1
Ole Tange

Diese Lösung funktioniert nur, wenn die Anzahl der FIFOs geringer ist als die Anzahl der Jobs, die GNU parallel parallel ausführen kann (was durch Dateizugriffsnummern und Anzahl der Prozesse begrenzt ist):

parallel -j0 --line-buffer cat ::: fifo* 

Es scheint in der Lage zu sein, bis zu 500 MB / s zu verschieben:

window1$ mkfifo  window1$ parallel -j0 --line-buffer cat ::: | pv >/dev/null  window2$ parallel -j0 'cat bigfile > ' ::: * 

Und es mischt keine halben Zeilen:

window1$ mkfifo  window1$ parallel -j0 --line-buffer cat ::: &  window2$ parallel -j0 'traceroute {}.1.1.1 > {}' ::: * 

Jobs werden parallel gelesen (ein Job wird vor dem nächsten nicht vollständig gelesen):

window1$ mkfifo  window1$ parallel -j0 --line-buffer cat ::: * > >(tr -s ABCabc)  window2$ long_lines_with_pause() { perl -e 'print STDOUT "a"x30000_000," "'  perl -e 'print STDOUT "b"x30000_000," "'  perl -e 'print STDOUT "c"x30000_000," "'  echo "$1"  sleep 2  perl -e 'print STDOUT "A"x30000_000," "'  perl -e 'print STDOUT "B"x30000_000," "'  perl -e 'print STDOUT "C"x30000_000," "'  echo "$1"  } window2$ export -f long_lines_with_pause window2$ parallel -j0 'long_lines_with_pause {} > {}' ::: * 

Hier wird eine Menge von 'abc' (erste Hälfte eines Jobs) vor 'ABC' (zweite Hälfte des Jobs) gedruckt.

1
user928506

So,

tail -q -n+1 -f --pid=stop-tail-when-this-is-gone fifo1 fifo2 fifo3 

fast funktioniert (wie in den ersten Kommentaren zu dieser früheren Version meiner Antwort erwähnt, obwohl Sie vorher ein "for f in fifo *; cat </ dev / null> $ f & done" benötigen), um sicherzustellen, dass alle FIFOs für das Schreiben geöffnet sind weil coreutils tail O_RDONLY ohne O_NONBLOCK) öffnet.

Leider gibt es einen Fehler, der vorsieht, dass tailZeilen / Datensätze nur mit Eingaben von Pipes auf stdin, nicht aber mit Eingaben von Named Pipes / FIFOs in Argumenten enden. Eines Tages kann jemand Coreutils Schwanz reparieren.

Um eine echte Multi-Consumer / Single-Producer-Warteschlange zu erhalten, die die Zeilenenden berücksichtigt, können Sie in der Zwischenzeit ein einfaches C-Programm verwenden, das ich als 100-Zeilen-Zeile bezeichne tailpipes.c:

#include <stdio.h> #include <stdlib.h> #include <string.h> //TODO: Find&document build environments lacking memrchr #include <unistd.h> #include <fcntl.h> #include <time.h> #include <errno.h> #include <signal.h> #include <sys/types.h> #include <sys/stat.h> #define errstr strerror(errno)  char const * const Use = "%s: %s\n\nUsage:\n\n" " %s [-p PID] [-o OPEN_MODE(RW)] [-d DLM(\\n)] [-s SEC(.01)] PATH1 PATH2..\n\n" "Read delimited records (lines by default) from all input paths, writing only\n" "complete records to stdout and changing to a stop-at-EOF mode upon receiving\n" "SIGHUP (unlike \"tail -fqn+1\" which just dies) OR when we first notice that\n" "PID does not exist (if PID is given). Since by default fifos are opened RW,\n" "signal/PID termination is needed to not loop forever, but said FIFOs may be\n" "closed & reopened by other processes as often as is convenient. For one-shot\n" "writing style, ending input reads at the first EOF, use \"-oRO\". Also, DLM\n" "adjusts the record delimiter byte from the default newline, and SEC adjusts\n" "max select sleep time. Any improperly terminated final records are sent to\n" "stderr at the end of execution (with a label and bracketing).\n";  int writer_done; void sig(int signum) { writer_done = 1; }  int main(int N, char *V[]) { signed char ch; char *buf[N-1], delim = '\n', *V0 = V[0], *eol; int len[N-1], fds[N-1], nBf[N-1], i, fdMx = 0, nS = 0, nF = 0, oFlags = O_RDWR; pid_t pid = 0; ssize_t nR, nW; struct timespec tmOut = { 0, 10000000 }; //10 ms select time out fd_set fdRdMaster, fdRd; //If we get signaled before here, this program dies and data may be lost. //If possible use -p PID option w/pre-extant PID of appropriate lifetime. signal(SIGHUP, sig); //Install sig() for SIGHUP memset((void *)fds, 0, sizeof fds); memset((void *)len, 0, sizeof len); FD_ZERO(&fdRdMaster); fdRd = fdRdMaster; while ((ch = getopt(N, V, "d:p:s:o:")) != -1) switch (ch) { //For \0 do '' as a sep CLI arg double tO; case 'd': delim = optarg ? *optarg : '\n'; break; case 'p': pid = optarg ? atoi(optarg) : 0; break; case 's': tO = optarg ? atof(optarg) : .01; tmOut.tv_sec = (long)tO; tmOut.tv_nsec = 1e9 * (tO - tmOut.tv_sec); break; case 'o': oFlags = (optarg && strcasecmp(optarg, "ro") == 0) ? O_RDONLY | O_NONBLOCK : O_RDWR; break; default: return fprintf(stderr, Use, V0, "bad option", V0), 1; } V += optind; N -= optind; //Shift off option args if (N < 1) return fprintf(stderr, Use, V0, "too few arguments", V0), 2; setvbuf(stdout, NULL, _IONBF, 65536); //Full pipe on Linux for (i = 0; i < N; i++) //Check for any available V[] if ((fds[i] = open(V[i], oFlags)) != -1) { struct stat st; fstat(fds[i], &st); if (!S_ISFIFO(st.st_mode)) return fprintf(stderr,"%s: %s not a named pipe\n", V0, V[i]), 3; nF++; FD_SET(fds[i], &fdRdMaster); //Add fd to master copy for pselect buf[i] = malloc(nBf[i] = 4096); if (fds[i] > fdMx) fdMx = fds[i]; } else if (errno == EINTR) { //We may get signaled to finish up.. i--; continue; //..before we even this far. } else return fprintf(stderr, "%s: open(%s): %s\n", V0, V[i], errstr), 3; fdMx++; fdRd = fdRdMaster; while (nF && (nS = pselect(fdMx, &fdRd, NULL, NULL, &tmOut, NULL)) != -99) { if (pid && kill(pid, 0) != 0 && errno != EPERM) //Given pid didn't exist writer_done = 1; if (nS == 0 && writer_done) //No input & no writers break; else if (nS == -1) { //Some select error: if (errno != EINTR && errno == EAGAIN) //..fatal or retry return fprintf(stderr, "%s: select: %s\n", V0, errstr), 4; continue; } for (i = 0; nS > 0 && i < N; i++) { //For all fds.. if (fds[i] < 0 || !FD_ISSET(fds[i], &fdRd)) //with readable data continue; if ((nR = read(fds[i], buf[i]+len[i], nBf[i] - len[i])) < 0) { if (errno != EAGAIN && errno != EINTR) fprintf(stderr, "%s: read: %s\n", V0, errstr); continue; } else if (oFlags == (O_RDONLY | O_NONBLOCK) && nR == 0) { FD_CLR(fds[i], &fdRdMaster); nF--; free(buf[i]); } len[i] += nR; //Update Re: read data if ((eol = memrchr(buf[i], delim, len[i]))) { nW = eol - buf[i] + 1; //Only to last delim if (fwrite(buf[i], nW, 1, stdout) == 1) { memmove(buf[i], buf[i] + nW, len[i] - nW); len[i] -= nW; //Residual buffer shift } else return fprintf(stderr, "%s: %d bytes->stdout failed: %s\n", V0, len[i], errstr), 5; } else if (len[i] == nBf[i]) { //NoDelim&FullBuf=>GROW void *tmp; if (nBf[i] >= 1 << 30) return fprintf(stderr, "%s: record > 1 GiB\n", V0), 6; nBf[i] *= 2; if (!(tmp = realloc(buf[i], nBf[i]))) return fprintf(stderr,"%s: out of memory\n", V0), 7; buf[i] = tmp; } } fdRd = fdRdMaster; } for (i = 0; i < N; i++) //Ensure any residual data is.. if (len[i] > 0) { //..labeled,bracketed,=>stderr. fprintf(stderr, "%s: %s: final unterminated record: {", V0, V[i]); fwrite(buf[i], len[i], 1, stderr); fputs("}\n", stderr); } return 0; } 

Install ist Cut & Paste & cc -Owhatever tailpipes.c -o somewhere-in-$PATH/tailpipes. Getestet unter Linux & FreeBSD. Ich bekomme ungefähr 2500e6 Bytes / Sek., Aber der Speicher ist möglicherweise schneller als die 500e6 Bytes / Sek-Box.

Der Algorithmus ist ungefähr wie vorgeschlagen, aber allgemeiner. O_NONBLOCK wird nur mit O_RDONLY und mit einigen Optionen für die Benutzerfreundlichkeit benötigt, z. B. das Öffnen der FIFOs O_RDWR standardmäßig, sodass Schreiber viele Male schließen und erneut öffnen können und -p PID-Tracking für ein Race-Free-Protokoll verwenden. Sie können -oRO übergeben, um EOF zu verwenden, wenn Sie möchten. tailpipesbehandelt auch unvollständige Zeilen bei Programmende, sendet sie beschriftet und an stderr geklammert für den Fall, dass eine einfache Nachbearbeitung möglich ist, um die Datensätze vollständig zu machen, oder wenn Protokolle davon für das Debuggen nützlich sind.

Verwendungsbeispiel. GNU xargskann ein Einzelanwender-, Multi-Producer- / Fan-Out-Teil einer parallelen Pipeline mit Map-Reduction-ish sein tailpipes, der als Fan-In-Record-Bounding-Part fungiert und keinen Speicherplatz für temporäre Dateien verwendet:

export MYTEMP=$(mktemp -d /tmp/MYPROG.XXXXX) FIFOs=`n=0; while [ $n -lt 8 ]; do echo $MYTEMP/$n; n=$((n+1)); done` mkfifo $FIFOs sleep 2147483647 & p=$! #Cannot know xargs pid is good for long ( find . -print0 | xargs -0 -P8 --process-slot-var=MYSLOT MYPROGRAM kill $p ) & #Inform tailpipes writers are done tailpipes -p$p $FIFOs | CONSUMING-PIPELINE rm -rf $MYTEMP wait #Wait for xargs subshell to finish 

In dem oben genannten, ist es wichtig, dass A) naus geht 0an der entsprechenden oberen Grenze, weil das die Regelung xargsverwendet für MYSLOTund B) MYPROGRAMrichtet ihre Ausgänge in eine neu zugewiesenen $MYSLOT-keyed Datei wie $MYTEMP/$MYSLOTzB, exec > $MYTEMP/$MYSLOTwenn MYPROGRAMein Shell - Skript. Der Shell- / Programm-Wrapper könnte in vielen Fällen beseitigt werden, wenn xargsein hypothetischer --process-slot-outParameter für die Einrichtung seiner Kinderstadouts benötigt wird.

Ich habe das getestet. Es funktioniert nicht auf meinem CygWin-System: Wenn ich nur Daten an `fifo3` sende, passiert nichts. Was passieren sollte ist, dass die Daten auf "fifo3" gedruckt werden sollen. Sie scheinen davon auszugehen, dass die Produzenten ständig produzieren werden. Das ist keine sichere Annahme. Ole Tange vor 5 Jahren 0
Dein Schwanz blockiert wahrscheinlich offen für das Schreiben von fifo1 und fifo2. Für manche Produzenten ist es in Ordnung, nie etwas zu produzieren, aber ein Prozess muss jeden für das Schreiben öffnen, wie in meiner Erklärung der "Katzen" -Schleife im detaillierteren Beispiel beschrieben. user928506 vor 5 Jahren 0
Jetzt bekomme ich nicht blockierende, sondern Mischzeilen - das ist dasselbe wie mein Beispiel für "cat fifo1 &". Können Sie Ihr Beispiel anpassen, um zu zeigen, dass Sie mit Programmen wie "traceroute" umgehen können, die halbe Zeilen ausspucken? Ole Tange vor 5 Jahren 0
Ich persönlich bevorzuge reproduzierbare Beispiele für Fehler, aber ich konnte die Ausgabe wie folgt mischen: #! / Bin / sh sleep 1000 & tail --pid = $! -qfn + 1 0 1> out & ((echo -na; sleep 1; echo b; sleep 1; echo -nc)> 0 & (echo 1; echo -n 2)> 1 & wait pkill -P $$ sleep ) # oder töte den ganzen Schlaf, etc. Katze abhauen Huh. Ich kann die Formatierung nicht in das Kommentarfeld einfügen, ich bekam eine "c1" -Zeile und Zahlen und Buchstaben sollten nicht gemischt werden. Angesichts solcher Fehler habe ich meine Antwort aktualisiert, um ein zuverlässigeres C-Programm für diese Verwendung bereitzustellen. user928506 vor 5 Jahren 0
"Huh. Ich kann die Formatierung nicht in das Kommentarfeld einfügen", und Sie haben jetzt geantwortet, warum ich keinen MCVE in den Kommentar aufgenommen habe :) Ole Tange vor 5 Jahren 0
0
Ole Tange

Eine elegantere Antwort, die keine nutzlose Kopie auf der Festplatte puffert:

#!/usr/bin/perl   use threads; use threads::shared; use Thread::Queue;  my $done :shared;  my $DataQueue = Thread::Queue->new();  my @producers; for (@ARGV) { push @producers, threads->create('producer', $_); }  while($done <= $#ARGV) { # This blocks until $DataQueue->pending > 0  print $DataQueue->dequeue(); }  for (@producers) { $_->join(); }   sub producer { open(my $fh, "<", shift) || die; while(<$fh>) { $DataQueue->enqueue($_); } # Closing $fh blocks  # close $fh;  $done++; # Guard against race condition  $DataQueue->enqueue(""); }