Fix nvidia-smi parsing

This commit is contained in:
Ingo Oppermann 2024-12-09 16:21:41 +01:00
parent cfc5b7d16f
commit 64a2136501
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
4 changed files with 197 additions and 42 deletions

View File

@ -0,0 +1,22 @@
# gpu pid type sm mem enc dec jpg ofa fb ccpm command
# Idx # C/G % % % % % % MB MB name
0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg
1 - - - - - - - - - - -
0 3908637 C 4 0 3 0 - - 1209 0 ffmpeg
1 - - - - - - - - - - -
0 3908637 C 4 0 3 0 - - 1209 0 ffmpeg
1 - - - - - - - - - - -
0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg
1 - - - - - - - - - - -
0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg
1 - - - - - - - - - - -
0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg
1 - - - - - - - - - - -
0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg
1 - - - - - - - - - - -
0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg
1 - - - - - - - - - - -
0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg
1 - - - - - - - - - - -
0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg
1 - - - - - - - - - - -

View File

@ -9,6 +9,7 @@ import (
"regexp"
"slices"
"strconv"
"strings"
"sync"
"time"
@ -140,6 +141,7 @@ type writerProcess struct {
buf bytes.Buffer
ch chan Process
terminator []byte
matcher *processMatcher
}
func (w *writerProcess) Write(data []byte) (int, error) {
@ -160,7 +162,7 @@ func (w *writerProcess) Write(data []byte) (int, error) {
break
}
s, err := parseProcess(content)
s, err := w.matcher.Parse(content)
if err != nil {
continue
}
@ -171,66 +173,96 @@ func (w *writerProcess) Write(data []byte) (int, error) {
return n, nil
}
const processMatcher = `^\s*([0-9]+)\s+([0-9]+)\s+[A-Z]\s+([0-9-]+)\s+[0-9-]+\s+([0-9-]+)\s+([0-9-]+)\s+([0-9]+).*`
type processMatcher struct {
re *regexp.Regexp
mapping map[string]int
}
// # gpu pid type sm mem enc dec fb command
// # Idx # C/G % % % % MB name
//
// 0 7372 C 2 0 2 - 136 ffmpeg
// 0 12176 C 5 2 3 7 782 ffmpeg
// 0 20035 C 8 2 4 1 1145 ffmpeg
// 0 20141 C 2 1 1 3 429 ffmpeg
// 0 29591 C 2 1 - 2 435 ffmpeg
var reProcessMatcher = regexp.MustCompile(processMatcher)
func newProcessMatcher() *processMatcher {
m := &processMatcher{
re: regexp.MustCompile(`\s+`),
mapping: map[string]int{},
}
func parseProcess(data []byte) (Process, error) {
return m
}
func (m *processMatcher) Parse(data []byte) (Process, error) {
p := Process{}
if len(data) == 0 {
return p, fmt.Errorf("empty line")
}
if data[0] == '#' {
line := string(data)
if strings.HasPrefix(line, "# gpu") {
m.mapping = map[string]int{}
columns := m.re.Split(strings.TrimPrefix(line, "# "), -1)
for i, column := range columns {
m.mapping[column] = i
}
}
if line[0] == '#' {
return p, fmt.Errorf("comment")
}
matches := reProcessMatcher.FindStringSubmatch(string(data))
if matches == nil {
columns := m.re.Split(strings.TrimSpace(line), -1)
if len(columns) == 0 {
return p, fmt.Errorf("no matches found")
}
if len(matches) != 7 {
return p, fmt.Errorf("not the expected number of matches found")
if columns[0] == line {
return p, fmt.Errorf("no matches found")
}
if d, err := strconv.ParseInt(matches[1], 10, 0); err == nil {
p.Index = int(d)
}
if d, err := strconv.ParseInt(matches[2], 10, 32); err == nil {
p.PID = int32(d)
}
if matches[3][0] != '-' {
if d, err := strconv.ParseFloat(matches[3], 64); err == nil {
p.Usage = d
if index, ok := m.mapping["gpu"]; ok {
if d, err := strconv.ParseInt(columns[index], 10, 0); err == nil {
p.Index = int(d)
}
}
if matches[4][0] != '-' {
if d, err := strconv.ParseFloat(matches[4], 64); err == nil {
p.Encoder = d
if index, ok := m.mapping["pid"]; ok {
if d, err := strconv.ParseInt(columns[index], 10, 32); err == nil {
p.PID = int32(d)
}
}
if matches[5][0] != '-' {
if d, err := strconv.ParseFloat(matches[5], 64); err == nil {
p.Decoder = d
if index, ok := m.mapping["sm"]; ok {
if columns[index] != "-" {
if d, err := strconv.ParseFloat(columns[index], 64); err == nil {
p.Usage = d
}
}
}
if d, err := strconv.ParseUint(matches[6], 10, 64); err == nil {
p.Memory = d * 1024 * 1024
if index, ok := m.mapping["enc"]; ok {
if columns[index] != "-" {
if d, err := strconv.ParseFloat(columns[index], 64); err == nil {
p.Encoder = d
}
}
}
if index, ok := m.mapping["dec"]; ok {
if columns[index] != "-" {
if d, err := strconv.ParseFloat(columns[index], 64); err == nil {
p.Decoder = d
}
}
}
if index, ok := m.mapping["fb"]; ok {
if columns[index] != "-" {
if d, err := strconv.ParseUint(columns[index], 10, 64); err == nil {
p.Memory = d * 1024 * 1024
}
}
}
if p.PID == 0 {
return p, fmt.Errorf("no process found")
}
return p, nil
@ -271,6 +303,7 @@ func New(path string) gpu.GPU {
n.wrProcess = &writerProcess{
ch: make(chan Process, 32),
terminator: []byte("\n"),
matcher: newProcessMatcher(),
}
ctx, cancel := context.WithCancel(context.Background())
@ -379,12 +412,14 @@ func (n *nvidia) runProcessOnce(path string) (map[int32]Process, error) {
return nil, err
}
matcher := newProcessMatcher()
lines := bytes.Split(data.Bytes(), []byte{'\n'})
process := map[int32]Process{}
for _, line := range lines {
p, err := parseProcess(line)
p, err := matcher.Parse(line)
if err != nil {
continue
}

View File

@ -85,15 +85,47 @@ func TestParseQuery(t *testing.T) {
}, nv)
}
func TestParseProcess(t *testing.T) {
data, err := os.ReadFile("./fixtures/process.txt")
func TestProcessMatcher(t *testing.T) {
matcher := newProcessMatcher()
_, err := matcher.Parse([]byte("# gpu pid type sm mem enc dec fb command"))
require.Error(t, err)
require.Equal(t, map[string]int{
"gpu": 0,
"pid": 1,
"type": 2,
"sm": 3,
"mem": 4,
"enc": 5,
"dec": 6,
"fb": 7,
"command": 8,
}, matcher.mapping)
p, err := matcher.Parse([]byte(" 0 7372 C 42 1 12 34 136 ffmpeg "))
require.NoError(t, err)
require.Equal(t, Process{
Index: 0,
PID: 7372,
Memory: 136 * 1024 * 1024,
Usage: 42,
Encoder: 12,
Decoder: 34,
}, p)
}
func TestParseProcess(t *testing.T) {
data, err := os.ReadFile("./fixtures/process_1.txt")
require.NoError(t, err)
matcher := newProcessMatcher()
lines := bytes.Split(data, []byte("\n"))
process := map[int32]Process{}
for _, line := range lines {
p, err := parseProcess(line)
p, err := matcher.Parse(line)
if err != nil {
continue
}
@ -143,17 +175,44 @@ func TestParseProcess(t *testing.T) {
Decoder: 1,
},
}, process)
data, err = os.ReadFile("./fixtures/process_2.txt")
require.NoError(t, err)
lines = bytes.Split(data, []byte("\n"))
process = map[int32]Process{}
for _, line := range lines {
p, err := matcher.Parse(line)
if err != nil {
continue
}
process[p.PID] = p
}
require.Equal(t, map[int32]Process{
3908637: {
Index: 0,
PID: 3908637,
Memory: 1209 * 1024 * 1024,
Usage: 5,
Encoder: 3,
Decoder: 0,
},
}, process)
}
func TestParseProcessNoProcesses(t *testing.T) {
data, err := os.ReadFile("./fixtures/process_noprocesses.txt")
require.NoError(t, err)
matcher := newProcessMatcher()
lines := bytes.Split(data, []byte("\n"))
process := map[int32]Process{}
for _, line := range lines {
p, err := parseProcess(line)
p, err := matcher.Parse(line)
if err != nil {
continue
}
@ -219,12 +278,13 @@ func TestWriterQuery(t *testing.T) {
}
func TestWriterProcess(t *testing.T) {
data, err := os.ReadFile("./fixtures/process.txt")
data, err := os.ReadFile("./fixtures/process_1.txt")
require.NoError(t, err)
wr := &writerProcess{
ch: make(chan Process, 32),
terminator: []byte("\n"),
matcher: newProcessMatcher(),
}
process := map[int32]Process{}
@ -287,6 +347,44 @@ func TestWriterProcess(t *testing.T) {
Decoder: 1,
},
}, process)
data, err = os.ReadFile("./fixtures/process_2.txt")
require.NoError(t, err)
wr = &writerProcess{
ch: make(chan Process, 32),
terminator: []byte("\n"),
matcher: newProcessMatcher(),
}
process = map[int32]Process{}
wg = sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for p := range wr.ch {
process[p.PID] = p
}
}()
_, err = wr.Write(data)
require.NoError(t, err)
close(wr.ch)
wg.Wait()
require.Equal(t, map[int32]Process{
3908637: {
Index: 0,
PID: 3908637,
Memory: 1209 * 1024 * 1024,
Usage: 5,
Encoder: 3,
Decoder: 0,
},
}, process)
}
func TestNvidiaGPUCount(t *testing.T) {