docgrouper/main.go

501 lines
15 KiB
Go

// Time sheet
// Previously ~1h
// March 13, 2024: 00:00-02:30
// March 19, 2024: 15:00-19:00
// March 23, 2024: 20:00-22:00
// April 02, 2024: 12:30-17:00
// April 04, 2024: 21:00-23:30
// April 05, 2024: 00:00-02:00
package main
import (
"bufio"
"flag"
"fmt"
"log/slog"
"os"
"path"
"runtime"
"slices"
"strconv"
"strings"
"sync"
"sync/atomic"
)
const (
// defaultSimilarityThreshold is the default minimum similarity required for
// two files to be considered related. This value is arbitrary and could be
// adjusted based on the specific requirements of the problem.
defaultSimilarityThreshold = 0.5
// defaultDataFilePath describes the default location where the file pool can be
// found.
defaultDataFilePath = "files"
)
// Command line options
var (
dataFilePath string
similarityThreshold float64
useDocPrefix bool
verbose bool
numWorkers int
)
func main() {
documents, err := run(os.Args)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(-1)
}
for _, doc := range documents {
fmt.Println(doc)
}
}
// run is the main entry point for the program.
func run(args []string) ([]*Document, error) {
flags := flag.NewFlagSet(args[0], flag.ExitOnError)
flags.StringVar(&dataFilePath, "path", defaultDataFilePath, "path to the file pool")
flags.Float64Var(&similarityThreshold, "threshold", defaultSimilarityThreshold, "similarity threshold")
flags.IntVar(&numWorkers, "workers", runtime.NumCPU()*2, "number of workers to use")
flags.BoolVar(&useDocPrefix, "prefix", false, "use '[doc ###]' prefix for output")
flags.BoolVar(&verbose, "verbose", false, "enable verbose logging")
_ = flags.Parse(args[1:])
// The files need to be processed in order of time, so determine the
// timestamp of each file and sort them by time.
fileTimes, times, err := orderFiles(dataFilePath)
if err != nil {
return nil, err
}
dm := NewDocumentManager(dataFilePath, similarityThreshold, numWorkers)
for i, timestamp := range times {
var (
// Track the files at this timestamp that have been associated with
// documents, so we can identify unassociated files later and then
// create new documents for them. This needs to be distinct for each
// timestamp, so it's created inside the timestamp loop's scope.
associatedFiles sync.Map
// We might need to create new documents for files that weren't
// associated with any document at this timestamp, so we need to make
// sure that this timestamp has been entirely processed first. We do
// this by waiting for the workers to indicate they've finished a work
// item.
wg sync.WaitGroup
)
log("processing timestamp", "timestamp", timestamp, "timestampIndex", i, "totalTimestamps", len(times))
for i, doc := range dm.Documents {
wg.Add(1)
dm.WorkCh <- WorkItem{
doc: doc,
fileNumbers: fileTimes[timestamp],
timestamp: timestamp,
associatedFiles: &associatedFiles,
wg: &wg,
}
log(
"submitted work",
"documentNumber", i+1,
"documentID", doc.ID,
"totalDocs", len(dm.Documents),
"timestamp", timestamp,
)
}
wg.Wait()
// Now that this timestamp has been fully processed, we can check to see
// what files haven't been associated existing documents, and create new
// documents for them.
var docsAdded int
for _, fileNumber := range fileTimes[timestamp] {
if _, ok := associatedFiles.Load(fileNumber); !ok {
dm.AddNewDocument(fileNumber, timestamp)
docsAdded++
}
}
if docsAdded > 0 {
log("created new documents", "numAdded", docsAdded, "timestamp", timestamp)
}
// Free up memory.
dm.ShrinkCache()
}
dm.Shutdown()
return dm.SortedDocuments(), nil
}
// WorkItem is what will be sent to the the workers in the worker pool.
type WorkItem struct {
doc *Document
fileNumbers []int
timestamp int
associatedFiles *sync.Map
wg *sync.WaitGroup
}
// DocumentManager handles the processing of documents and files. It maintains a
// list of documents and a cache of file contents, and uses a pool of workers to
// compare documents against files.
type DocumentManager struct {
// Documents is the list of documents that have been identified.
Documents []*Document
// WorkCh is the channel through which work items are submitted to the workers.
WorkCh chan WorkItem
// docIDSource is a concurrency-safe source from which to identify documents.
// This could easily be something other than an integer, but using this allows
// us to just use the standard library.
docIDSource atomic.Uint32
similarityThreshold float64
fcc *FileContentsCache
wg sync.WaitGroup
}
// NewDocumentManager creates a new DocumentManager with the specified base path
// for the file pool and the specified number of workers.
func NewDocumentManager(fileBasePath string, similarityThreshold float64, numWorkers int) *DocumentManager {
dm := &DocumentManager{
Documents: make([]*Document, 0),
similarityThreshold: similarityThreshold,
fcc: &FileContentsCache{BaseDir: fileBasePath},
WorkCh: make(chan WorkItem),
}
// Start workers.
for wID := range numWorkers {
go dm.ComparisonWorker(wID + 1)
}
dm.wg.Add(numWorkers)
return dm
}
// Shutdown cleans up the document manager by closing the work channel to
// trigger workers to exit and then waits for all workers to exit.
func (dm *DocumentManager) Shutdown() {
close(dm.WorkCh)
dm.wg.Wait()
}
func (dm *DocumentManager) AddNewDocument(fileNumber, timestamp int) {
doc := Document{
ID: dm.docIDSource.Add(1),
LatestTimestamp: timestamp,
AssociatedFiles: []int{fileNumber},
}
dm.Documents = append(dm.Documents, &doc)
}
// ShrinkCache removes files from the cache that will never be used again, by
// evicting those files that are not associated with any document. Note that
// this is not concurrent-safe and should only be called when operations that
// could modify the document list are not running. This is an optimization, but
// could be removed if memory usage is not a concern.
func (dm *DocumentManager) ShrinkCache() {
var latestDocumentFiles []int
for _, doc := range dm.Documents {
latestDocumentFiles = append(latestDocumentFiles, doc.LatestAssociatedFile())
}
dm.fcc.ClearFilesExcept(latestDocumentFiles)
}
// Return the list of documents with their associated files in ascending order,
// and the documents themselves ordered by the documents by their first
// associated file.
func (dm *DocumentManager) SortedDocuments() []*Document {
// Sort the associated files for each document.
for _, doc := range dm.Documents {
doc.SortAssociatedFiles()
}
// Sort the documents by their first associated file number.
slices.SortFunc(dm.Documents, func(a, b *Document) int {
return a.AssociatedFiles[0] - b.AssociatedFiles[0]
})
return dm.Documents
}
// ComparisonWorker is a function that receives work items describing a document
// and a list of candidate file IDs to compare against. It will compare the
// document against each file and if a match is found, associate the file with the
// document sent in the work item, and record the file as having been matched.
func (dm *DocumentManager) ComparisonWorker(workerID int) {
for workItem := range dm.WorkCh {
for _, fileNumber := range workItem.fileNumbers {
if _, ok := workItem.associatedFiles.Load(fileNumber); ok {
// This file has already been matched; skip it.
continue
}
latestFileNumber := workItem.doc.LatestAssociatedFile()
similarity, err := dm.compareFiles(latestFileNumber, fileNumber)
if err != nil {
// Simplistic error handling: log the error and continue.
slog.Error(
"error comparing files",
"file1", latestFileNumber,
"file2", fileNumber,
"document", workItem.doc.ID,
"worker", workerID,
)
}
// If current file doesn't match current document, skip to the next file.
if similarity < dm.similarityThreshold {
continue
}
// Current file matches current document, so record this.
workItem.doc.AssociateFile(fileNumber, workItem.timestamp)
workItem.associatedFiles.Store(fileNumber, struct{}{})
log(
"match found",
"document", workItem.doc.ID,
"file", fileNumber,
"time", workItem.timestamp,
"worker", workerID,
)
// We don't need to consider this document anymore since we've found
// a match. End processing and wait for more work.
break
}
workItem.wg.Done()
}
// Report that this worker is shutting down.
dm.wg.Done()
}
// compareFiles computes how much two files overlap, on a scale
// of 0 to 1 by iterating through the files and identifying lines
// that are duplicated.
func (dm *DocumentManager) compareFiles(f1Number, f2Number int) (float64, error) {
f1, err := dm.fcc.GetFileContents(f1Number)
if err != nil {
return 0, fmt.Errorf("file %d: %w", f1Number, err)
}
f2, err := dm.fcc.GetFileContents(f2Number)
if err != nil {
return 0, fmt.Errorf("file %d: %w", f2Number, err)
}
histogram := make(map[string]int)
for _, lines := range [][]string{f1, f2} {
for _, line := range lines {
// Skip blank lines, which can throw off the count.
if line == "" {
continue
}
histogram[line]++
}
}
var overlap int
for _, v := range histogram {
if v == 2 {
overlap++
}
}
return float64(overlap) / float64(len(histogram)), nil
}
// Document stores a document ID and a list of associated files.
type Document struct {
AssociatedFiles []int
LatestTimestamp int
ID uint32
}
// String formats a document for output in the format described in the
// requirements.
func (d Document) String() string {
var sb strings.Builder
for _, f := range d.AssociatedFiles {
sb.WriteString(fmt.Sprintf("%d ", f))
}
if useDocPrefix {
return fmt.Sprintf("[doc %4d] %s", d.ID, strings.TrimSpace(sb.String()))
}
return sb.String()
}
// AssociateFile adds a file number to the list of associated files for a
// document, and also records the latest timestamp now associated with the
// document.
func (d *Document) AssociateFile(fileNumber, timestamp int) {
d.AssociatedFiles = append(d.AssociatedFiles, fileNumber)
d.LatestTimestamp = timestamp
}
// LatestAssociatedFile returns the most recent file associated with a document.
// Note that this presumes that the list of associated files is sorted in
// temporal order based on the timestamp at the head of the file.
func (d Document) LatestAssociatedFile() int {
return d.AssociatedFiles[len(d.AssociatedFiles)-1]
}
// SortAssociatedFiles sorts the list of associated files for a document, since
// the requirements stipulate output in ascending numerical order. Note that
// this changes the order of associated files from their original temporal
// order, so must only be invoked when the work is entirely finished.
func (d *Document) SortAssociatedFiles() {
slices.Sort(d.AssociatedFiles)
}
// FileContentsCache is a cache of file contents, keyed by file number,
// to avoid reading the same file from disk multiple times.
type FileContentsCache struct {
BaseDir string
cache sync.Map
}
// GetFileContents returns the contents of a file, excluding the first timestamp
// line. If the file is already in the cache, the contents are returned from
// there, otherwise the file is read from disk and the contents are cached.
func (fcc *FileContentsCache) GetFileContents(fileNumber int) ([]string, error) {
if contents, ok := fcc.cache.Load(fileNumber); ok {
return contents.([]string), nil
}
var (
fileName = makeFilePath(fcc.BaseDir, fileNumber)
lines []string
)
f, err := os.Open(fileName)
if err != nil {
return nil, err
}
s := bufio.NewScanner(f)
// Read first line and ignore it since it's just the timestamp.
_ = s.Scan()
// Read file and store contents in cache.
for s.Scan() {
lines = append(lines, s.Text())
}
if err := s.Err(); err != nil {
return nil, err
}
fcc.cache.Store(fileNumber, lines)
return lines, nil
}
// ClearFilesExcept removes the contents of the fileContentsCache except for the
// provided file numbers. This helps conserve memory by removing the contents of
// files that are no longer of interest, which we can be sure of since we are
// proceeding in order of time.
func (fcc *FileContentsCache) ClearFilesExcept(fileNumbers []int) {
// Build up a list of entries to delete to avoid modifying the concurrent
// map while iterating over it.
var toDelete []int
fcc.cache.Range(func(key, _ any) bool {
storedFileNum := key.(int)
if !slices.Contains(fileNumbers, storedFileNum) {
toDelete = append(toDelete, storedFileNum)
}
return true
})
for _, k := range toDelete {
fcc.cache.Delete(k)
}
}
// readFileTime reads the first line of the file, which represents a
// time/version. The integer value will be returned.
func readFileTime(filepath string) (int, error) {
file, err := os.Open(filepath)
if err != nil {
return 0, err
}
defer file.Close()
s := bufio.NewScanner(file)
var firstLine string
if s.Scan() {
firstLine = s.Text()
}
if err := s.Err(); err != nil {
return 0, err
}
time, err := strconv.Atoi(firstLine)
if err != nil {
return 0, fmt.Errorf("invalid time %s: %w", firstLine, err)
}
return time, nil
}
// orderFiles determines the timestamp version of each file and creates a map of
// time to file numbers. It sorts the times (since maps are not ordered) so that
// the map can be iterated in order of time. This allows stepping through the
// history of the files from the beginning. Using this, we can construct a
// "chain" of evolution for a given document.
func orderFiles(dir string) (map[int][]int, []int, error) {
timeMap := make(map[int][]int)
dirEntries, err := os.ReadDir(dir)
if err != nil {
return nil, nil, fmt.Errorf("reading directory %s: %w", dir, err)
}
for _, entry := range dirEntries {
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".txt") {
continue
}
// Get the numeric representation of the file number.
var fileNumber int
{
numberStr := strings.TrimSuffix(entry.Name(), ".txt")
var err error
if fileNumber, err = strconv.Atoi(numberStr); err != nil {
return nil, nil, fmt.Errorf("invalid file number in file name %q: %w", entry.Name(), err)
}
}
filePath := path.Join(dir, entry.Name())
modTime, err := readFileTime(filePath)
if err != nil {
return nil, nil, err
}
if timeMap[modTime] == nil {
timeMap[modTime] = make([]int, 0)
}
timeMap[modTime] = append(timeMap[modTime], fileNumber)
}
// Now make a slice of the times and sort them, so we can iterate through
// them in order.
timeSlice := make([]int, 0, len(timeMap))
for k := range timeMap {
timeSlice = append(timeSlice, k)
}
slices.Sort(timeSlice)
return timeMap, timeSlice, nil
}
func makeFileName(number int) string {
return fmt.Sprintf("%d.txt", number)
}
func makeFilePath(dataFilePath string, number int) string {
return path.Join(dataFilePath, makeFileName(number))
}
func log(msg string, args ...any) {
if verbose {
slog.Info(msg, args...)
}
}