docgrouper/main.go

647 lines
18 KiB
Go

package main
import (
"bufio"
"flag"
"fmt"
"log/slog"
"math"
"os"
"path"
"runtime"
"runtime/pprof"
"slices"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
const (
// defaultSimilarityThreshold is the default minimum similarity required for
// two files to be considered related. This value is arbitrary and could be
// adjusted based on the specific requirements of the problem.
defaultSimilarityThreshold = 0.5
// defaultDataFilePath describes the default location where the file pool can be
// found.
defaultDataFilePath = "files"
)
// Command line options
var (
dataFilePath string
similarityThreshold float64
outputFile string
useDocPrefix bool
verbose bool
numWorkers int
)
func main() {
var (
start = time.Now()
padding = "\n"
)
f, err := os.Create("cpu.prof")
if err != nil {
panic(err)
}
if err := pprof.StartCPUProfile(f); err != nil {
panic(err)
}
defer pprof.StopCPUProfile()
documents, output, err := run(os.Args)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(-1)
}
if output != os.Stdout {
defer output.Close()
padding = ""
}
duration := time.Since(start)
v := make(Visualizer)
for _, doc := range documents {
fmt.Fprintln(output, doc)
v.Add(*doc)
}
fmt.Fprintf(
os.Stderr,
"%s%d documents identified in %s\n\n%s\n",
padding,
len(documents),
duration.Truncate(time.Millisecond),
v.Render("Distribution for Number of Associated Files per Document", 60),
)
}
// run is the main entry point for the program.
func run(args []string) ([]*Document, *os.File, error) {
flags := flag.NewFlagSet(args[0], flag.ExitOnError)
flags.StringVar(&dataFilePath, "path", defaultDataFilePath, "path to the file pool")
flags.Float64Var(&similarityThreshold, "threshold", defaultSimilarityThreshold, "similarity threshold")
flags.StringVar(&outputFile, "output", "", "output file (default is stdout)")
flags.IntVar(&numWorkers, "workers", runtime.NumCPU()*2, "number of workers to use")
flags.BoolVar(&useDocPrefix, "prefix", false, "use '[doc ###]' prefix for output")
flags.BoolVar(&verbose, "verbose", false, "enable verbose logging")
_ = flags.Parse(args[1:])
output := os.Stdout
if outputFile != "" {
var err error
output, err = os.Create(outputFile)
if err != nil {
return nil, nil, fmt.Errorf("creating output file: %w", err)
}
}
shouldReportStatus := !verbose && output != os.Stdout
if shouldReportStatus {
defer fmt.Fprintln(os.Stderr)
}
// The files need to be processed in order of time, so determine the
// timestamp of each file and sort them by time.
fileTimes, times, err := orderFiles(dataFilePath, shouldReportStatus)
if err != nil {
return nil, nil, err
}
dm := NewDocumentManager(dataFilePath, similarityThreshold, numWorkers)
for i, timestamp := range times {
var (
// Track the files at this timestamp that have been associated with
// documents, so we can identify unassociated files later and then
// create new documents for them. This needs to be distinct for each
// timestamp, so it's created inside the timestamp loop's scope.
claimedFiles sync.Map
// We might need to create new documents for files that weren't
// associated with any document at this timestamp, so we need to make
// sure that this timestamp has been entirely processed first. We do
// this by waiting for the workers to indicate they've finished a work
// item.
wg sync.WaitGroup
)
log("processing timestamp", "timestamp", timestamp, "timestampIndex", i, "totalTimestamps", len(times))
if shouldReportStatus {
fmt.Fprintf(os.Stderr, "\rProcessing timestamp %d of %d...", i+1, len(times))
}
for i, doc := range dm.Documents {
wg.Add(1)
dm.WorkCh <- WorkItem{
doc: doc,
fileNumbers: fileTimes[timestamp],
timestamp: timestamp,
claimedFiles: &claimedFiles,
wg: &wg,
}
log(
"submitted work",
"documentNumber", i+1,
"documentID", doc.ID,
"totalDocs", len(dm.Documents),
"timestamp", timestamp,
)
}
wg.Wait()
// Now that this timestamp has been fully processed, we can check to see
// what files haven't been associated existing documents, and create new
// documents for them.
var docsAdded int
for _, fileNumber := range fileTimes[timestamp] {
if _, ok := claimedFiles.Load(fileNumber); !ok {
dm.AddNewDocument(fileNumber, timestamp)
docsAdded++
}
}
if docsAdded > 0 {
log("created new documents", "numAdded", docsAdded, "timestamp", timestamp)
}
// Free up memory.
dm.ShrinkCache()
}
dm.Shutdown()
return dm.SortedDocuments(), output, nil
}
// WorkItem is what will be sent to the the workers in the worker pool.
type WorkItem struct {
doc *Document
fileNumbers []int
timestamp int
claimedFiles *sync.Map
wg *sync.WaitGroup
}
// DocumentManager handles the processing of documents and files. It maintains a
// list of documents and a cache of file contents, and uses a pool of workers to
// compare documents against files.
type DocumentManager struct {
// Documents is the list of documents that have been identified.
Documents []*Document
// WorkCh is the channel through which work items are submitted to the workers.
WorkCh chan WorkItem
// docIDSource is a concurrency-safe source from which to identify documents.
// This could easily be something other than an integer, but using this allows
// us to just use the standard library.
docIDSource atomic.Uint32
similarityThreshold float64
fcc *FileContentsCache
wg sync.WaitGroup
}
// NewDocumentManager creates a new DocumentManager with the specified base path
// for the file pool and the specified number of workers.
func NewDocumentManager(fileBasePath string, similarityThreshold float64, numWorkers int) *DocumentManager {
dm := &DocumentManager{
Documents: make([]*Document, 0),
similarityThreshold: similarityThreshold,
fcc: &FileContentsCache{BaseDir: fileBasePath},
WorkCh: make(chan WorkItem),
}
// Start workers.
for wID := range numWorkers {
go dm.ComparisonWorker(wID + 1)
}
dm.wg.Add(numWorkers)
return dm
}
// Shutdown cleans up the document manager by closing the work channel to
// trigger workers to exit and then waits for all workers to exit.
func (dm *DocumentManager) Shutdown() {
close(dm.WorkCh)
dm.wg.Wait()
}
func (dm *DocumentManager) AddNewDocument(fileNumber, timestamp int) {
doc := Document{
ID: dm.docIDSource.Add(1),
LatestTimestamp: timestamp,
AssociatedFiles: []int{fileNumber},
}
dm.Documents = append(dm.Documents, &doc)
}
// ShrinkCache removes files from the cache that will never be used again, by
// evicting those files that are not associated with any document. Note that
// this is not concurrent-safe and should only be called when operations that
// could modify the document list are not running. This is an optimization, but
// could be removed if memory usage is not a concern.
func (dm *DocumentManager) ShrinkCache() {
var latestDocumentFiles []int
for _, doc := range dm.Documents {
latestDocumentFiles = append(latestDocumentFiles, doc.LatestAssociatedFile())
}
dm.fcc.ClearFilesExcept(latestDocumentFiles)
}
// Return the list of documents with their associated files in ascending order,
// and the documents themselves ordered by the documents by their first
// associated file.
func (dm *DocumentManager) SortedDocuments() []*Document {
// Sort the associated files for each document.
for _, doc := range dm.Documents {
doc.SortAssociatedFiles()
}
// Sort the documents by their first associated file number.
slices.SortFunc(dm.Documents, func(a, b *Document) int {
return a.AssociatedFiles[0] - b.AssociatedFiles[0]
})
return dm.Documents
}
// ComparisonWorker is a function that receives work items describing a document
// and a list of candidate file IDs to compare against. It will compare the
// document against each file and if a match is found, associate the file with the
// document sent in the work item, and record the file as having been matched.
func (dm *DocumentManager) ComparisonWorker(workerID int) {
defer dm.wg.Done()
for workItem := range dm.WorkCh {
dm.maybeAssociateFileWithDocument(workItem, workerID)
}
}
func (dm *DocumentManager) maybeAssociateFileWithDocument(workItem WorkItem, workerID int) {
defer workItem.wg.Done()
for _, candidateFileNumber := range workItem.fileNumbers {
if _, ok := workItem.claimedFiles.Load(candidateFileNumber); ok {
// This file has already been matched with another document, so skip it.
continue
}
latestFileNumber := workItem.doc.LatestAssociatedFile()
similarity, err := dm.compareFiles(latestFileNumber, candidateFileNumber)
if err != nil {
// Simplistic error handling: log the error and continue.
slog.Error(
"error comparing files",
"latestAssociatedFile", latestFileNumber,
"candidateFile", candidateFileNumber,
"document", workItem.doc.ID,
"worker", workerID,
)
}
// If current file matches current document, record it and exit.
if similarity >= dm.similarityThreshold {
workItem.doc.AssociateFile(candidateFileNumber, workItem.timestamp)
workItem.claimedFiles.Store(candidateFileNumber, struct{}{})
log(
"match found",
"document", workItem.doc.ID,
"file", candidateFileNumber,
"time", workItem.timestamp,
"worker", workerID,
)
return
}
}
}
// compareFiles computes how much two files overlap on a scale of 0 to 1 by
// iterating through the files and calculating a similarity score that's based
// on the number of line-centric differences between the contents of the two
// files.
func (dm *DocumentManager) compareFiles(f1Number, f2Number int) (float64, error) {
f1, err := dm.fcc.GetFileLineID(f1Number)
if err != nil {
return 0, fmt.Errorf("file %d: %w", f1Number, err)
}
f2, err := dm.fcc.GetFileLineID(f2Number)
if err != nil {
return 0, fmt.Errorf("file %d: %w", f2Number, err)
}
similarity := compareFileLineIDs(f1, f2)
return similarity, nil
}
// Document stores a document ID and a list of associated files.
type Document struct {
AssociatedFiles []int
LatestTimestamp int
ID uint32
}
// String formats a document for output in the format described in the
// requirements.
func (d Document) String() string {
var sb strings.Builder
for _, f := range d.AssociatedFiles {
sb.WriteString(fmt.Sprintf("%d ", f))
}
if useDocPrefix {
return fmt.Sprintf("[doc %4d] %s", d.ID, strings.TrimSpace(sb.String()))
}
return sb.String()
}
// AssociateFile adds a file number to the list of associated files for a
// document, and also records the latest timestamp now associated with the
// document.
func (d *Document) AssociateFile(fileNumber, timestamp int) {
d.AssociatedFiles = append(d.AssociatedFiles, fileNumber)
d.LatestTimestamp = timestamp
}
// LatestAssociatedFile returns the most recent file associated with a document.
// Note that this presumes that the list of associated files is sorted in
// temporal order based on the timestamp at the head of the file.
func (d Document) LatestAssociatedFile() int {
return d.AssociatedFiles[len(d.AssociatedFiles)-1]
}
// SortAssociatedFiles sorts the list of associated files for a document, since
// the requirements stipulate output in ascending numerical order. Note that
// this changes the order of associated files from their original temporal
// order, so must only be invoked when the work is entirely finished.
func (d *Document) SortAssociatedFiles() {
slices.Sort(d.AssociatedFiles)
}
// FileContentsCache is a cache of file contents, keyed by file number,
// to avoid reading the same file from disk multiple times.
type FileContentsCache struct {
BaseDir string
cache sync.Map
lineIDs sync.Map
}
// GetFileContents returns the contents of a file, excluding the first timestamp
// line. If the file is already in the cache, the contents are returned from
// there, otherwise the file is read from disk and the contents are cached.
func (fcc *FileContentsCache) GetFileContents(fileNumber int) ([]string, error) {
if contents, ok := fcc.cache.Load(fileNumber); ok {
return contents.([]string), nil
}
var (
fileName = makeFilePath(fcc.BaseDir, fileNumber)
lines []string
)
f, err := os.Open(fileName)
if err != nil {
return nil, err
}
s := bufio.NewScanner(f)
// Read first line and ignore it since it's just the timestamp.
_ = s.Scan()
// Read file and store contents in cache.
for s.Scan() {
lines = append(lines, s.Text())
}
if err := s.Err(); err != nil {
return nil, err
}
fcc.cache.Store(fileNumber, lines)
return lines, nil
}
func (fcc *FileContentsCache) GetFileLineID(fileNumber int) ([]uint64, error) {
if cachedLineIDs, ok := fcc.lineIDs.Load(fileNumber); ok {
return cachedLineIDs.([]uint64), nil
}
lines, err := fcc.GetFileContents(fileNumber)
if err != nil {
return nil, fmt.Errorf("adding line IDs for file %d: %w", fileNumber, err)
}
lineIDs := makeFileLinesID(lines)
fcc.lineIDs.Store(fileNumber, lineIDs)
return lineIDs, nil
}
var (
fileLinesCache sync.Map
fileLineIDSource atomic.Uint64
)
func makeFileLinesID(fileLines []string) []uint64 {
fileLineIDs := make([]uint64, 0, len(fileLines))
for _, line := range fileLines {
lineID, ok := fileLinesCache.Load(line)
if !ok {
newID := fileLineIDSource.Add(1)
fileLinesCache.Store(line, newID)
fileLineIDs = append(fileLineIDs, newID)
} else {
fileLineIDs = append(fileLineIDs, lineID.(uint64))
}
}
slices.Sort(fileLineIDs)
return fileLineIDs
}
func compareFileLineIDs(f1, f2 []uint64) float64 {
var (
i, j = 0, 0
count int
similarity int
)
for i < len(f1) && j < len(f2) {
count++
if f1[i] == f2[j] {
similarity++
i++
j++
} else if f1[i] < f2[j] {
i++
} else {
j++
}
}
count += len(f1) - i + len(f2) - j
if count == 0 {
return 0
}
return float64(similarity) / float64(count)
}
// ClearFilesExcept removes the contents of the fileContentsCache except for the
// provided file numbers. This helps conserve memory by removing the contents of
// files that are no longer of interest, which we can be sure of since we are
// proceeding in order of time.
func (fcc *FileContentsCache) ClearFilesExcept(fileNumbers []int) {
// Build up a list of entries to delete to avoid modifying the concurrent
// map while iterating over it.
var toDelete []int
fcc.cache.Range(func(key, _ any) bool {
storedFileNum := key.(int)
if !slices.Contains(fileNumbers, storedFileNum) {
toDelete = append(toDelete, storedFileNum)
}
return true
})
for _, k := range toDelete {
fcc.cache.Delete(k)
}
}
// readFileTime reads the first line of the file, which represents a
// time/version. The integer value will be returned.
func readFileTime(filepath string) (int, error) {
file, err := os.Open(filepath)
if err != nil {
return 0, err
}
defer file.Close()
s := bufio.NewScanner(file)
var firstLine string
if s.Scan() {
firstLine = s.Text()
}
if err := s.Err(); err != nil {
return 0, err
}
time, err := strconv.Atoi(firstLine)
if err != nil {
return 0, fmt.Errorf("invalid time %s: %w", firstLine, err)
}
return time, nil
}
// orderFiles determines the timestamp version of each file and creates a map of
// time to file numbers. It sorts the times (since maps are not ordered) so that
// the map can be iterated in order of time. This allows stepping through the
// history of the files from the beginning. Using this, we can construct a
// "chain" of evolution for a given document.
func orderFiles(dir string, shouldReportStatus bool) (map[int][]int, []int, error) {
if shouldReportStatus {
defer fmt.Fprintln(os.Stderr)
}
timeMap := make(map[int][]int)
dirEntries, err := os.ReadDir(dir)
if err != nil {
return nil, nil, fmt.Errorf("reading directory %s: %w", dir, err)
}
for i, entry := range dirEntries {
if shouldReportStatus {
fmt.Fprintf(os.Stderr, "\rReading directory %d of %d...", i+1, len(dirEntries))
}
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".txt") {
continue
}
// Get the numeric representation of the file number.
var fileNumber int
{
numberStr := strings.TrimSuffix(entry.Name(), ".txt")
var err error
if fileNumber, err = strconv.Atoi(numberStr); err != nil {
return nil, nil, fmt.Errorf("invalid file number in file name %q: %w", entry.Name(), err)
}
}
filePath := path.Join(dir, entry.Name())
modTime, err := readFileTime(filePath)
if err != nil {
return nil, nil, err
}
if timeMap[modTime] == nil {
timeMap[modTime] = make([]int, 0)
}
timeMap[modTime] = append(timeMap[modTime], fileNumber)
}
// Now make a slice of the times and sort them, so we can iterate through
// them in order.
timeSlice := make([]int, 0, len(timeMap))
for k := range timeMap {
timeSlice = append(timeSlice, k)
}
slices.Sort(timeSlice)
return timeMap, timeSlice, nil
}
// Visualizer is a utility to provide insight into the shape of the data
// processed.
type Visualizer map[int]int
func (v Visualizer) Add(d Document) {
numAssocFiles := len(d.AssociatedFiles)
v[numAssocFiles]++
}
func (v Visualizer) Render(title string, width int) string {
if len(v) == 0 {
return ""
}
type pair struct {
numAssocFiles int
numDocsWithThisNumFiles int
}
var (
slicedMap []pair
totalNumDocs int
)
for naf, nd := range v {
slicedMap = append(slicedMap, pair{numAssocFiles: naf, numDocsWithThisNumFiles: nd})
totalNumDocs += nd
}
slices.SortFunc(slicedMap, func(a, b pair) int {
return a.numDocsWithThisNumFiles - b.numDocsWithThisNumFiles
})
slices.Reverse(slicedMap)
var sb strings.Builder
sb.WriteString(title)
sb.WriteRune('\n')
sb.WriteString(strings.Repeat("=", width))
sb.WriteRune('\n')
scaleFactor := float64(totalNumDocs) / float64(slicedMap[0].numDocsWithThisNumFiles)
for _, p := range slicedMap {
ratio := float64(p.numDocsWithThisNumFiles) / float64(totalNumDocs)
numChars := int(math.Ceil(ratio * float64(width-14) * scaleFactor))
sb.WriteString(fmt.Sprintf(
"%3d | %s (%.1f%%)\n",
p.numAssocFiles,
strings.Repeat("*", numChars),
float64(p.numDocsWithThisNumFiles)*100/float64(totalNumDocs),
))
}
return sb.String()[0 : sb.Len()-1]
}
func makeFileName(number int) string {
return fmt.Sprintf("%d.txt", number)
}
func makeFilePath(dataFilePath string, number int) string {
return path.Join(dataFilePath, makeFileName(number))
}
func log(msg string, args ...any) {
if verbose {
slog.Info(msg, args...)
}
}