It is a common architectural design pattern where one process (the producer) generates files and stores them at a specific location, either within the current system or an external one. Another process (the consumer) then picks up these files and processes them.
There are various ways to design such an architecture, and the choice depends on several factors to determine the approach that best fits.
First Approach:
One approach is to have the consumer run as a cron job, executing at regular intervals, such as every six hours. When the consumer starts processing, it will handle only the files that were available before it began. Any new files arriving while the consumer is running will be processed in the next scheduled run.
However, certain scenarios require additional handling. For instance, what if the consumer fails to complete processing the current batch of files before the next scheduled run? In such cases, we could maintain a database table to track the cron job’s current state. If it is time for the next run but the previous run has not completed, the new run could be skipped to prevent overlapping processes.
Second Approach:
Another approach is to implement file listeners or file watchers for the directory where the producer is dumping the files. These file listeners continuously monitor the directory for new files or changes to existing files. As soon as a new file arrives or an existing file is modified, the listener picks it up and starts processing it.
The advantage of this approach is that files are processed immediately upon arrival, eliminating the need for a separate database table to maintain the cron job’s state, as no cron job is required in this setup. But on the other side, the process is always running.
Again which approach to choose depends on what fits your use case well.
Project Setup:
In this article, we will consider a scenario where a producer pushes .txt
files to a directory, and a consumer processes only the .txt
files. Any files other than .txt
will be ignored and moved to an unknown
directory. For simplicity, we will print the name of each .txt
file and then move it to the read
directory.
We will focus on building only the consumer using the second approach by implementing file listeners in Go. Let us start by creating a module:
$ go mod init gowatcher
Next, let us create the files and directories required for this project:
$ touch main.go
$ mkdir unread read unknown
We need three directories: unread
, where the producer will dump the files; read
, where files will be moved after processing; and unknown
, where non-txt files will be moved, as we are only expecting .txt
files.
Next, let us install the fsnotify
package, which will handle the heavy lifting of listening to the directory and triggering events.
$ go get github.com/fsnotify/fsnotify
The project’s file structure will look like below:
go-watcher/
├── read
├── unknown
├── unread
├── go.mod
└── main.go
Implementing the listener
The fsnotify
package provides the NewWatcher
function, which returns a watcher (or listener). This watcher monitors a specified directory and triggers events. We will provide the unread
directory to the watcher, so it can monitor any new files being added by the producer.
The core idea is that when a new file is added to the watched directory, the watcher triggers an fsnotify.Create
event. When this event is detected, we will print the file name and move it to the read
directory.
Let us put this all together in the main.go file:
package main
import (
"github.com/fsnotify/fsnotify"
"log"
"os"
"path/filepath"
"strings"
)
// moveFile handles moving a file to the specified destination directory
func moveFile(srcPath, destDir string) error {
// check the destination directory exists
if err := os.MkdirAll(destDir, os.ModePerm); err != nil {
return err
}
// construct the destination path
destPath := filepath.Join(destDir, filepath.Base(srcPath))
// move the file
if err := os.Rename(srcPath, destPath); err != nil {
return err
}
log.Printf("moved file %s to %s directory", srcPath, destDir)
return nil
}
func main() {
// create new watcher
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatalf("failed to create watcher: %v", err)
}
defer watcher.Close()
unreadDir := "unread"
readDir := "read"
unknownDir := "unknown"
// start listening for events
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
// check if the event is for create
if event.Op&fsnotify.Create == fsnotify.Create {
// get the item info that is created in the "unread" directory
info, err := os.Stat(event.Name)
if err != nil {
log.Printf("error getting the item info for %s: %v", event.Name, err)
continue
}
// check if it is a .txt file else move it to the "unknown" directory
if !info.IsDir() && strings.HasSuffix(info.Name(), ".txt") {
log.Printf("processing .txt file: %s", info.Name())
if err := moveFile(event.Name, readDir); err != nil {
log.Printf("failed to move file %s to 'read' directory: %v", event.Name, err)
}
} else {
if err := moveFile(event.Name, unknownDir); err != nil {
log.Printf("failed to move file %s to 'unknown' directory: %v", event.Name, err)
}
}
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Printf("watcher error: %v", err)
}
}
}()
// add the "unread" directory to the watcher
err = watcher.Add(unreadDir)
if err != nil {
log.Fatalf("failed to add path to watcher: %v", err)
}
// block main goroutine indefinitely
<-make(chan struct{})
}
Since we want this to function as a continuously running service, we will prevent the main goroutine from exiting.
Let’s run the project with go run:
$ go run main.go
We see our project is running and not exiting, that is a good sign. Let’s drop a few files in the unread
directory and see what happens:
2025/01/23 17:58:59 processing .txt file: file1.txt
2025/01/23 17:58:59 moved file unread/file1.txt to read directory
2025/01/23 17:58:59 processing .txt file: file2.txt
2025/01/23 17:58:59 moved file unread/file2.txt to read directory
2025/01/23 17:58:59 processing .txt file: file3.txt
2025/01/23 17:58:59 moved file unread/file3.txt to read directory
2025/01/23 17:58:59 processing .txt file: file4.txt
2025/01/23 17:58:59 moved file unread/file4.txt to read directory
We see that the files are correctly moved to the read
directory. Similarly, if we drop a non-.txt
file, it will be moved to the unknown
directory.
Now, I have 200k files to drop into unread
. Should I go ahead?
Not quite yet. Since every file dropped into the unread
directory triggers the creation of a new goroutine, handling 200k files could lead to nearly 200k goroutines being spawned, which could cause significant performance issues.
There are ways to control concurrency. If you’re familiar with semaphores, I would recommend implementing one to limit the number of concurrent goroutines.
Lastly, I want to highlight the deployment of this service. There are several ways to deploy it, and one option I can think of is compiling the code and registering the binary as a systemd service. This would allow the service to run in the background and be easily managed.