629 lines
18 KiB
Go
629 lines
18 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"flag"
|
|
"fmt"
|
|
"log/slog"
|
|
"math"
|
|
"os"
|
|
"path"
|
|
"runtime"
|
|
"runtime/pprof"
|
|
"slices"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
// defaultSimilarityThreshold is the default minimum similarity required for
|
|
// two files to be considered related. This value is arbitrary and could be
|
|
// adjusted based on the specific requirements of the problem.
|
|
defaultSimilarityThreshold = 0.5
|
|
|
|
// defaultDataFilePath describes the default location where the file pool can be
|
|
// found.
|
|
defaultDataFilePath = "files"
|
|
)
|
|
|
|
// Command line options
|
|
var (
|
|
dataFilePath string
|
|
similarityThreshold float64
|
|
outputFile string
|
|
useDocPrefix bool
|
|
verbose bool
|
|
numWorkers int
|
|
)
|
|
|
|
func main() {
|
|
var (
|
|
start = time.Now()
|
|
padding = "\n"
|
|
)
|
|
|
|
f, err := os.Create("cpu.prof")
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
if err := pprof.StartCPUProfile(f); err != nil {
|
|
panic(err)
|
|
}
|
|
defer pprof.StopCPUProfile()
|
|
|
|
documents, output, err := run(os.Args)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: %v\n", err)
|
|
os.Exit(-1)
|
|
}
|
|
if output != os.Stdout {
|
|
defer output.Close()
|
|
padding = ""
|
|
}
|
|
duration := time.Since(start)
|
|
|
|
v := make(Visualizer)
|
|
for _, doc := range documents {
|
|
fmt.Fprintln(output, doc)
|
|
v.Add(*doc)
|
|
}
|
|
fmt.Fprintf(
|
|
os.Stderr,
|
|
"%s%d documents identified in %s\n\n%s\n",
|
|
padding,
|
|
len(documents),
|
|
duration.Truncate(time.Millisecond),
|
|
v.Render("Distribution for Number of Associated Files per Document", 60),
|
|
)
|
|
}
|
|
|
|
// run is the main entry point for the program.
|
|
func run(args []string) ([]*Document, *os.File, error) {
|
|
flags := flag.NewFlagSet(args[0], flag.ExitOnError)
|
|
flags.StringVar(&dataFilePath, "path", defaultDataFilePath, "path to the file pool")
|
|
flags.Float64Var(&similarityThreshold, "threshold", defaultSimilarityThreshold, "similarity threshold")
|
|
flags.StringVar(&outputFile, "output", "", "output file (default is stdout)")
|
|
flags.IntVar(&numWorkers, "workers", runtime.NumCPU()*2, "number of workers to use")
|
|
flags.BoolVar(&useDocPrefix, "prefix", false, "use '[doc ###]' prefix for output")
|
|
flags.BoolVar(&verbose, "verbose", false, "enable verbose logging")
|
|
_ = flags.Parse(args[1:])
|
|
|
|
output := os.Stdout
|
|
if outputFile != "" {
|
|
var err error
|
|
output, err = os.Create(outputFile)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("creating output file: %w", err)
|
|
}
|
|
}
|
|
shouldReportStatus := !verbose && output != os.Stdout
|
|
if shouldReportStatus {
|
|
defer fmt.Fprintln(os.Stderr)
|
|
}
|
|
|
|
// The files need to be processed in order of time, so determine the
|
|
// timestamp of each file and sort them by time.
|
|
fileTimes, times, err := orderFiles(dataFilePath)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
dm := NewDocumentManager(dataFilePath, similarityThreshold, numWorkers)
|
|
|
|
for i, timestamp := range times {
|
|
var (
|
|
// Track the files at this timestamp that have been associated with
|
|
// documents, so we can identify unassociated files later and then
|
|
// create new documents for them. This needs to be distinct for each
|
|
// timestamp, so it's created inside the timestamp loop's scope.
|
|
claimedFiles sync.Map
|
|
|
|
// We might need to create new documents for files that weren't
|
|
// associated with any document at this timestamp, so we need to make
|
|
// sure that this timestamp has been entirely processed first. We do
|
|
// this by waiting for the workers to indicate they've finished a work
|
|
// item.
|
|
wg sync.WaitGroup
|
|
)
|
|
|
|
log("processing timestamp", "timestamp", timestamp, "timestampIndex", i, "totalTimestamps", len(times))
|
|
if shouldReportStatus {
|
|
fmt.Fprintf(os.Stderr, "\rProcessing timestamp %d of %d...", i+1, len(times))
|
|
}
|
|
for i, doc := range dm.Documents {
|
|
wg.Add(1)
|
|
dm.WorkCh <- WorkItem{
|
|
doc: doc,
|
|
fileNumbers: fileTimes[timestamp],
|
|
timestamp: timestamp,
|
|
claimedFiles: &claimedFiles,
|
|
wg: &wg,
|
|
}
|
|
log(
|
|
"submitted work",
|
|
"documentNumber", i+1,
|
|
"documentID", doc.ID,
|
|
"totalDocs", len(dm.Documents),
|
|
"timestamp", timestamp,
|
|
)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
// Now that this timestamp has been fully processed, we can check to see
|
|
// what files haven't been associated existing documents, and create new
|
|
// documents for them.
|
|
var docsAdded int
|
|
for _, fileNumber := range fileTimes[timestamp] {
|
|
if _, ok := claimedFiles.Load(fileNumber); !ok {
|
|
dm.AddNewDocument(fileNumber, timestamp)
|
|
docsAdded++
|
|
}
|
|
}
|
|
if docsAdded > 0 {
|
|
log("created new documents", "numAdded", docsAdded, "timestamp", timestamp)
|
|
}
|
|
|
|
// Free up memory.
|
|
dm.ShrinkCache()
|
|
}
|
|
|
|
dm.Shutdown()
|
|
return dm.SortedDocuments(), output, nil
|
|
}
|
|
|
|
// WorkItem is what will be sent to the the workers in the worker pool.
|
|
type WorkItem struct {
|
|
doc *Document
|
|
fileNumbers []int
|
|
timestamp int
|
|
claimedFiles *sync.Map
|
|
wg *sync.WaitGroup
|
|
}
|
|
|
|
// DocumentManager handles the processing of documents and files. It maintains a
|
|
// list of documents and a cache of file contents, and uses a pool of workers to
|
|
// compare documents against files.
|
|
type DocumentManager struct {
|
|
// Documents is the list of documents that have been identified.
|
|
Documents []*Document
|
|
|
|
// WorkCh is the channel through which work items are submitted to the workers.
|
|
WorkCh chan WorkItem
|
|
|
|
// docIDSource is a concurrency-safe source from which to identify documents.
|
|
// This could easily be something other than an integer, but using this allows
|
|
// us to just use the standard library.
|
|
docIDSource atomic.Uint32
|
|
|
|
similarityThreshold float64
|
|
fcc *FileContentsCache
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// NewDocumentManager creates a new DocumentManager with the specified base path
|
|
// for the file pool and the specified number of workers.
|
|
func NewDocumentManager(fileBasePath string, similarityThreshold float64, numWorkers int) *DocumentManager {
|
|
dm := &DocumentManager{
|
|
Documents: make([]*Document, 0),
|
|
similarityThreshold: similarityThreshold,
|
|
fcc: &FileContentsCache{BaseDir: fileBasePath},
|
|
WorkCh: make(chan WorkItem),
|
|
}
|
|
|
|
// Start workers.
|
|
for wID := range numWorkers {
|
|
go dm.ComparisonWorker(wID + 1)
|
|
}
|
|
dm.wg.Add(numWorkers)
|
|
|
|
return dm
|
|
}
|
|
|
|
// Shutdown cleans up the document manager by closing the work channel to
|
|
// trigger workers to exit and then waits for all workers to exit.
|
|
func (dm *DocumentManager) Shutdown() {
|
|
close(dm.WorkCh)
|
|
dm.wg.Wait()
|
|
}
|
|
|
|
func (dm *DocumentManager) AddNewDocument(fileNumber, timestamp int) {
|
|
doc := Document{
|
|
ID: dm.docIDSource.Add(1),
|
|
LatestTimestamp: timestamp,
|
|
AssociatedFiles: []int{fileNumber},
|
|
}
|
|
dm.Documents = append(dm.Documents, &doc)
|
|
}
|
|
|
|
// ShrinkCache removes files from the cache that will never be used again, by
|
|
// evicting those files that are not associated with any document. Note that
|
|
// this is not concurrent-safe and should only be called when operations that
|
|
// could modify the document list are not running. This is an optimization, but
|
|
// could be removed if memory usage is not a concern.
|
|
func (dm *DocumentManager) ShrinkCache() {
|
|
var latestDocumentFiles []int
|
|
for _, doc := range dm.Documents {
|
|
latestDocumentFiles = append(latestDocumentFiles, doc.LatestAssociatedFile())
|
|
}
|
|
dm.fcc.ClearFilesExcept(latestDocumentFiles)
|
|
}
|
|
|
|
// Return the list of documents with their associated files in ascending order,
|
|
// and the documents themselves ordered by the documents by their first
|
|
// associated file.
|
|
func (dm *DocumentManager) SortedDocuments() []*Document {
|
|
// Sort the associated files for each document.
|
|
for _, doc := range dm.Documents {
|
|
doc.SortAssociatedFiles()
|
|
}
|
|
// Sort the documents by their first associated file number.
|
|
slices.SortFunc(dm.Documents, func(a, b *Document) int {
|
|
return a.AssociatedFiles[0] - b.AssociatedFiles[0]
|
|
})
|
|
return dm.Documents
|
|
}
|
|
|
|
// ComparisonWorker is a function that receives work items describing a document
|
|
// and a list of candidate file IDs to compare against. It will compare the
|
|
// document against each file and if a match is found, associate the file with the
|
|
// document sent in the work item, and record the file as having been matched.
|
|
func (dm *DocumentManager) ComparisonWorker(workerID int) {
|
|
defer dm.wg.Done()
|
|
for workItem := range dm.WorkCh {
|
|
dm.maybeAssociateFileWithDocument(workItem, workerID)
|
|
}
|
|
}
|
|
|
|
func (dm *DocumentManager) maybeAssociateFileWithDocument(workItem WorkItem, workerID int) {
|
|
defer workItem.wg.Done()
|
|
for _, candidateFileNumber := range workItem.fileNumbers {
|
|
if _, ok := workItem.claimedFiles.Load(candidateFileNumber); ok {
|
|
// This file has already been matched with another document, so skip it.
|
|
continue
|
|
}
|
|
latestFileNumber := workItem.doc.LatestAssociatedFile()
|
|
similarity, err := dm.compareFiles(latestFileNumber, candidateFileNumber)
|
|
if err != nil {
|
|
// Simplistic error handling: log the error and continue.
|
|
slog.Error(
|
|
"error comparing files",
|
|
"latestAssociatedFile", latestFileNumber,
|
|
"candidateFile", candidateFileNumber,
|
|
"document", workItem.doc.ID,
|
|
"worker", workerID,
|
|
)
|
|
}
|
|
|
|
// If current file matches current document, record it and exit.
|
|
if similarity >= dm.similarityThreshold {
|
|
workItem.doc.AssociateFile(candidateFileNumber, workItem.timestamp)
|
|
workItem.claimedFiles.Store(candidateFileNumber, struct{}{})
|
|
log(
|
|
"match found",
|
|
"document", workItem.doc.ID,
|
|
"file", candidateFileNumber,
|
|
"time", workItem.timestamp,
|
|
"worker", workerID,
|
|
)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// compareFiles computes how much two files overlap on a scale of 0 to 1 by
|
|
// iterating through the files and calculating a similarity score that's based
|
|
// on the number of line-centric differences between the contents of the two
|
|
// files.
|
|
func (dm *DocumentManager) compareFiles(f1Number, f2Number int) (float64, error) {
|
|
f1, err := dm.fcc.GetLineIDsForFile(f1Number)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("file %d: %w", f1Number, err)
|
|
}
|
|
f2, err := dm.fcc.GetLineIDsForFile(f2Number)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("file %d: %w", f2Number, err)
|
|
}
|
|
|
|
similarity := compareFileLineIDs(f1, f2)
|
|
return similarity, nil
|
|
}
|
|
|
|
// Document stores a document ID and a list of associated files.
|
|
type Document struct {
|
|
AssociatedFiles []int
|
|
LatestTimestamp int
|
|
ID uint32
|
|
}
|
|
|
|
// String formats a document for output in the format described in the
|
|
// requirements.
|
|
func (d Document) String() string {
|
|
var sb strings.Builder
|
|
for _, f := range d.AssociatedFiles {
|
|
sb.WriteString(fmt.Sprintf("%d ", f))
|
|
}
|
|
if useDocPrefix {
|
|
return fmt.Sprintf("[doc %4d] %s", d.ID, strings.TrimSpace(sb.String()))
|
|
}
|
|
return sb.String()
|
|
}
|
|
|
|
// AssociateFile adds a file number to the list of associated files for a
|
|
// document, and also records the latest timestamp now associated with the
|
|
// document.
|
|
func (d *Document) AssociateFile(fileNumber, timestamp int) {
|
|
d.AssociatedFiles = append(d.AssociatedFiles, fileNumber)
|
|
d.LatestTimestamp = timestamp
|
|
}
|
|
|
|
// LatestAssociatedFile returns the most recent file associated with a document.
|
|
// Note that this presumes that the list of associated files is sorted in
|
|
// temporal order based on the timestamp at the head of the file.
|
|
func (d Document) LatestAssociatedFile() int {
|
|
return d.AssociatedFiles[len(d.AssociatedFiles)-1]
|
|
}
|
|
|
|
// SortAssociatedFiles sorts the list of associated files for a document, since
|
|
// the requirements stipulate output in ascending numerical order. Note that
|
|
// this changes the order of associated files from their original temporal
|
|
// order, so must only be invoked when the work is entirely finished.
|
|
func (d *Document) SortAssociatedFiles() {
|
|
slices.Sort(d.AssociatedFiles)
|
|
}
|
|
|
|
// FileContentsCache is a cache of file contents, keyed by file number,
|
|
// to avoid reading the same file from disk multiple times.
|
|
type FileContentsCache struct {
|
|
BaseDir string
|
|
cache sync.Map
|
|
lineIDs sync.Map
|
|
}
|
|
|
|
// GetFileContents returns the contents of a file, excluding the first timestamp
|
|
// line. If the file is already in the cache, the contents are returned from
|
|
// there, otherwise the file is read from disk and the contents are cached.
|
|
func (fcc *FileContentsCache) GetFileContents(fileNumber int) ([]string, error) {
|
|
if contents, ok := fcc.cache.Load(fileNumber); ok {
|
|
return contents.([]string), nil
|
|
}
|
|
var (
|
|
fileName = makeFilePath(fcc.BaseDir, fileNumber)
|
|
lines []string
|
|
)
|
|
|
|
f, err := os.Open(fileName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s := bufio.NewScanner(f)
|
|
|
|
// Read first line and ignore it since it's just the timestamp.
|
|
_ = s.Scan()
|
|
|
|
// Read file and store contents in cache.
|
|
for s.Scan() {
|
|
lines = append(lines, s.Text())
|
|
}
|
|
if err := s.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fcc.cache.Store(fileNumber, lines)
|
|
return lines, nil
|
|
}
|
|
|
|
func (fcc *FileContentsCache) GetLineIDsForFile(fileNumber int) ([]uint64, error) {
|
|
if cachedLineIDs, ok := fcc.lineIDs.Load(fileNumber); ok {
|
|
return cachedLineIDs.([]uint64), nil
|
|
}
|
|
|
|
lines, err := fcc.GetFileContents(fileNumber)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("adding line IDs for file %d: %w", fileNumber, err)
|
|
}
|
|
|
|
lineIDs := makeFileLinesID(lines)
|
|
fcc.lineIDs.Store(fileNumber, lineIDs)
|
|
return lineIDs, nil
|
|
}
|
|
|
|
var (
|
|
fileLinesCache sync.Map
|
|
fileLineIDSource atomic.Uint64
|
|
)
|
|
|
|
func makeFileLinesID(fileLines []string) []uint64 {
|
|
fileLineIDs := make([]uint64, 0, len(fileLines))
|
|
for _, line := range fileLines {
|
|
lineID, ok := fileLinesCache.Load(line)
|
|
if !ok {
|
|
newID := fileLineIDSource.Add(1)
|
|
fileLinesCache.Store(line, newID)
|
|
fileLineIDs = append(fileLineIDs, newID)
|
|
} else {
|
|
fileLineIDs = append(fileLineIDs, lineID.(uint64))
|
|
}
|
|
}
|
|
slices.Sort(fileLineIDs)
|
|
return fileLineIDs
|
|
}
|
|
|
|
func compareFileLineIDs(f1, f2 []uint64) float64 {
|
|
var (
|
|
f2Index int
|
|
similarity int
|
|
)
|
|
for _, lineID := range f1 {
|
|
if f2Index < len(f2) && f2[f2Index] == lineID {
|
|
similarity += 2
|
|
f2Index++
|
|
}
|
|
}
|
|
|
|
return float64(similarity) / float64(len(f1)+len(f2))
|
|
}
|
|
|
|
// ClearFilesExcept removes the contents of the fileContentsCache except for the
|
|
// provided file numbers. This helps conserve memory by removing the contents of
|
|
// files that are no longer of interest, which we can be sure of since we are
|
|
// proceeding in order of time.
|
|
func (fcc *FileContentsCache) ClearFilesExcept(fileNumbers []int) {
|
|
// Build up a list of entries to delete to avoid modifying the concurrent
|
|
// map while iterating over it.
|
|
var toDelete []int
|
|
fcc.cache.Range(func(key, _ any) bool {
|
|
storedFileNum := key.(int)
|
|
if !slices.Contains(fileNumbers, storedFileNum) {
|
|
toDelete = append(toDelete, storedFileNum)
|
|
}
|
|
return true
|
|
})
|
|
for _, k := range toDelete {
|
|
fcc.cache.Delete(k)
|
|
}
|
|
}
|
|
|
|
// readFileTime reads the first line of the file, which represents a
|
|
// time/version. The integer value will be returned.
|
|
func readFileTime(filepath string) (int, error) {
|
|
file, err := os.Open(filepath)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer file.Close()
|
|
|
|
s := bufio.NewScanner(file)
|
|
var firstLine string
|
|
if s.Scan() {
|
|
firstLine = s.Text()
|
|
}
|
|
if err := s.Err(); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
time, err := strconv.Atoi(firstLine)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("invalid time %s: %w", firstLine, err)
|
|
}
|
|
return time, nil
|
|
}
|
|
|
|
// orderFiles determines the timestamp version of each file and creates a map of
|
|
// time to file numbers. It sorts the times (since maps are not ordered) so that
|
|
// the map can be iterated in order of time. This allows stepping through the
|
|
// history of the files from the beginning. Using this, we can construct a
|
|
// "chain" of evolution for a given document.
|
|
func orderFiles(dir string) (map[int][]int, []int, error) {
|
|
timeMap := make(map[int][]int)
|
|
|
|
dirEntries, err := os.ReadDir(dir)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("reading directory %s: %w", dir, err)
|
|
}
|
|
for _, entry := range dirEntries {
|
|
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".txt") {
|
|
continue
|
|
}
|
|
|
|
// Get the numeric representation of the file number.
|
|
var fileNumber int
|
|
{
|
|
numberStr := strings.TrimSuffix(entry.Name(), ".txt")
|
|
var err error
|
|
if fileNumber, err = strconv.Atoi(numberStr); err != nil {
|
|
return nil, nil, fmt.Errorf("invalid file number in file name %q: %w", entry.Name(), err)
|
|
}
|
|
}
|
|
|
|
filePath := path.Join(dir, entry.Name())
|
|
modTime, err := readFileTime(filePath)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if timeMap[modTime] == nil {
|
|
timeMap[modTime] = make([]int, 0)
|
|
}
|
|
timeMap[modTime] = append(timeMap[modTime], fileNumber)
|
|
}
|
|
|
|
// Now make a slice of the times and sort them, so we can iterate through
|
|
// them in order.
|
|
timeSlice := make([]int, 0, len(timeMap))
|
|
for k := range timeMap {
|
|
timeSlice = append(timeSlice, k)
|
|
}
|
|
slices.Sort(timeSlice)
|
|
return timeMap, timeSlice, nil
|
|
}
|
|
|
|
// Visualizer is a utility to provide insight into the shape of the data
|
|
// processed.
|
|
type Visualizer map[int]int
|
|
|
|
func (v Visualizer) Add(d Document) {
|
|
numAssocFiles := len(d.AssociatedFiles)
|
|
v[numAssocFiles]++
|
|
}
|
|
|
|
func (v Visualizer) Render(title string, width int) string {
|
|
if len(v) == 0 {
|
|
return ""
|
|
}
|
|
|
|
type pair struct {
|
|
numAssocFiles int
|
|
numDocsWithThisNumFiles int
|
|
}
|
|
|
|
var (
|
|
slicedMap []pair
|
|
totalNumDocs int
|
|
)
|
|
for naf, nd := range v {
|
|
slicedMap = append(slicedMap, pair{numAssocFiles: naf, numDocsWithThisNumFiles: nd})
|
|
totalNumDocs += nd
|
|
}
|
|
slices.SortFunc(slicedMap, func(a, b pair) int {
|
|
return a.numDocsWithThisNumFiles - b.numDocsWithThisNumFiles
|
|
})
|
|
slices.Reverse(slicedMap)
|
|
|
|
var sb strings.Builder
|
|
sb.WriteString(title)
|
|
sb.WriteRune('\n')
|
|
sb.WriteString(strings.Repeat("=", width))
|
|
sb.WriteRune('\n')
|
|
|
|
scaleFactor := float64(totalNumDocs) / float64(slicedMap[0].numDocsWithThisNumFiles)
|
|
for _, p := range slicedMap {
|
|
ratio := float64(p.numDocsWithThisNumFiles) / float64(totalNumDocs)
|
|
numChars := int(math.Ceil(ratio * float64(width-14) * scaleFactor))
|
|
sb.WriteString(fmt.Sprintf(
|
|
"%3d | %s (%.1f%%)\n",
|
|
p.numAssocFiles,
|
|
strings.Repeat("*", numChars),
|
|
float64(p.numDocsWithThisNumFiles)*100/float64(totalNumDocs),
|
|
))
|
|
}
|
|
return sb.String()[0 : sb.Len()-1]
|
|
}
|
|
|
|
func makeFileName(number int) string {
|
|
return fmt.Sprintf("%d.txt", number)
|
|
}
|
|
|
|
func makeFilePath(dataFilePath string, number int) string {
|
|
return path.Join(dataFilePath, makeFileName(number))
|
|
}
|
|
|
|
func log(msg string, args ...any) {
|
|
if verbose {
|
|
slog.Info(msg, args...)
|
|
}
|
|
}
|