#include #include #include #include #include #include #include #include #include #include #include #include #define CMD_MAX_LEN 1024 #define THREADS 10 #define PORT 1416 typedef struct BNDBUF BNDBUF; typedef struct SEM SEM; void die(const char *msg); struct params { SEM *active, *sync; BNDBUF *clients; }; // stores @value in the store associated with @key. Never fails! void kvs_add(const char *key, void *value); // returns the value associated with @key if it is in the store or NULL. void *kvs_get(const char *key); // calls @callback for every item in the key value store void kvs_iter(void (*callback)(const char *key, void *value)); // creates and returns a new ‘struct slot‘ reserved by @owner (see slot_own). // On error, errno is set and NULL returned. struct slot *slot_alloc(const char *owner); // atomically reserves @slot for @owner, or releases it if @owner is NULL. // On success, 0 is returned, otherwise -1 and errno is set appropriately. // This may be if the slot is already reserved and @owner is not NULL, or // already free if @owner is NULL, as well as related to required memory // allocations (@owner is dublicated). int slot_own(struct slot *slot, const char *owner); // creates a new semaphore with initial value of @count. // returns NULL on error and sets errno. SEM *semCreate(int count); void P(SEM *); void V(SEM *); // create a new bounded buffer of given @size. // returns NULL on error and sets errno BNDBUF *bbCreate(size_t size); // adds @value into @bb. Blocks if necessary and never fails. void bbPut(BNDBUF *bb, int value); // retrieves next value from @bb. Blocks if necessary and never fails. int bbGet(BNDBUF *bb); static void load(const char *path); static void *worker(void *arg); static int add_slot(const char *path); // Sucht in allen als Parameter übergebenen Pfaden nach reservierbaren Plätzen, // um diese im Arbeitsspeicher zu verwalten (load()). Anschließend werden die 10 // Arbeiterfäden (worker()) gestartet, sowie ein Socket zum Warten auf // eingehende Verbindungen geöffnet. Eingehende Verbindungen werden über einen // Ringpuffer zur Bearbeitung an die Arbeiterfäden gegeben. int main(int argc, char *argv[]) { if (argc < 2) { fprintf(stderr, "USAGE: %s [paths...]\n", argv[0]); return EXIT_FAILURE; } // Parameter verarbeiten for (int i = 1; i < argc; i++) { load(argv[i]); } // Netzwerkkommunikation aufsetzen int fd = socket(AF_INET6, SOCK_STREAM, 0); if (fd == -1) { die("socket"); } struct sockaddr_in6 sa = { .sin6_family = AF_INET6, .sin6_port = htons(PORT), .sin6_addr = in6addr_any }; if (bind(fd, (const struct sockaddr *)&sa, sizeof(sa))) { die("bind"); } if (listen(fd, SOMAXCONN)) { die("listen"); } // Thread-Pool aufsetzen struct params params = { .active = semCreate(THREADS), .sync = semCreate(1), .clients = bbCreate(2 * THREADS) }; if (!params.active || !params.sync || !params.clients) { die("init params"); } for (int i = 0; i < THREADS; ++i) { pthread_t tid; errno = pthread_create(&tid, NULL, worker, ¶ms); if (errno) { die("pthread_create"); } } if (sigaction(SIGPIPE, &(struct sigaction){.sa_handler = SIG_IGN}, NULL)) { die("sigaction"); } // Anfragen annehmen for (;;) { int cs = accept(fd, NULL, NULL); if (cs == -1) { perror("accept"); continue; } bbPut(params.clients, cs); } } // Prüft, ob path selbst eine Platzreservierung ist (reguläre Datei, deren Name // auf ".slot" endet), oder durchsucht path rekursiv, falls es sich um ein // Verzeichnis handelt. Platzreservierungen werden durch add_slot() behandelt. // Fehler in dieser Funktion führen mit einer aussagekräftigen Meldung zur // Beendigung des Programms. static void load(const char *path) { // Pfad-Type prüfen struct stat st; if (lstat(path, &st)) { die("stat"); } if (S_ISREG(st.st_mode)) { if (fnmatch("*.slot", path, 0)) { return; } if (add_slot(path)) { die("add_slot"); } } else if (S_ISDIR(st.st_mode)) { DIR *dir = opendir(path); if (!dir) { die("opendir"); } while (1) { errno = 0; struct dirent *ent = readdir(dir); if (!ent) { if (errno) { die("readdir"); } closedir(dir); break; } if (!strcmp(ent->d_name, ".") || !strcmp(ent->d_name, "..")) { continue; } char new_path[strlen(path) + strlen(ent->d_name) + 2]; strcpy(new_path, path); strcat(new_path, "/"); strcat(new_path, ent->d_name); load(new_path); } } } // Erstellt eine Datenstruktur (slot_alloc()) für die Platzreservierung, die zur // Datei path gehört, und fügt sie dem Key-Value-Store (kvs_add() hinzu. Im // Fehlerfall wird -1 zurückgegeben und der Grund in errno gesetzt, anderenfalls // 0. static int add_slot(const char *path) { char slot[strlen(path) - 5 + 1]; strncpy(slot, path, strlen(path) - 5); slot[strlen(path) - 5] = '\0'; FILE *f = fopen(path, "r"); // fclose fehlt if (!f) { return -1; } char owner[CMD_MAX_LEN + 1]; if (!fgets(owner, sizeof(owner), f)) { if (ferror(f)) { return -1; } struct slot *s = slot_alloc(NULL); if (!s) { return -1; } kvs_add(slot, s); return 0; } struct slot *s = slot_alloc(owner); if (!s) { return -1; } kvs_add(slot, s); return 0; } // Sichert die entsprechende Platzreservierung, kann daher als Callback-Funktion // für kvs_iter() verwendet werden. Dabei ist value vom Typ struct slot. Fehler // werden durch eine passende Meldung auf dem Fehlerausgabekanal behandelt, // sollen aber nicht zur Beendigung des Programms führen. static void save(const char *key, void *value) { // Aus Zeitgruenden nicht mehr geschafft. Hier muss man die entsprechende // .slot-Datei oeffnen, und den Owner reinschreiben. } // Erwartet als arg ein struct params und entnimmt Verbindungen aus dem // enthaltenen Ringpuffer. Anschließend werden die Befehle der Verbindung // zeilenweise eingelesen, in die Bestandteile zerteilt und an command // übergeben. Die zurückgegebe Zeichenkette wird dem Klienten als Antwort // geschickt, bevor die nächste Zeile bearbeitet wird. static void *worker(void *arg) { // Befehle auslesen und bearbeiten // Aus Zeitgruenden nicht mehr geschafft. Diese Funktion ist ziemlich // aehnlich zu der Thread-Funktion in der mother; mit der Beschreibung oben // sollte das nicht allzu schwierig zu implementieren sein. return NULL; } // Abhängig von den Werten von slot und owner wird der entsprechende Befehl // ausgeführt. Sind beide NULL, so soll der Arbeitsspeicherzustand durch // kvs_iter in die jeweiligen Dateien geschrieben werden. Hierbei muss // sichergestellt sein, dass andere Fäden mit Änderungen an den Reservierungen // warten, bis alles zurückgeschrieben wurde. Anderenfalls wird der // entsprechende slot im Key-Value-Store gesucht (kvs_get()) und die // Reservierung für owner ausgeführt (slot_own()). Im Erfolgsfall ist der // Rückgabewert "success", andern falls eine aussagekräftige, statische // Zeichenkette. const char *command(struct params *p, const char *slot, const char *owner) { if (slot != NULL) { // Reservierung verwalten P(p->active); struct slot *s = kvs_get(slot); if (!s) { V(p->active); return "slot not found"; } if (!slot_own(s, owner)) { V(p->active); return "failed to reserve slot"; } V(p->active); } else { // slot == NULL // Zustand synchronisieren P(p->sync); for (int i = 0; i < THREADS; i++) { P(p->active); } kvs_iter(save); for (int i = 0; i < THREADS; i++) { V(p->active); } V(p->sync); } return "success"; }