# IC3 Backend — Redis REST Migration + Monitoring Worker
## Claude Code Execution Prompt

---

> **How to use this file**
> 1. Open Claude Code in your IC3 Go backend project root
> 2. Type: `claude` then paste or drag this file
> 3. Or run directly: `claude < IC3_Redis_Migration_ClaudeCode.md`

---

## Context

I have a Go backend (`package main`) for the IC3 water utility platform.

**Current state:**
- `redis.go` uses `github.com/redis/go-redis/v9` connecting to `13.237.154.232:6379`
- Live Redis data flows: Redis → `scanAndBroadcast()` → `db.saveBatch()` → `telemetry` / `asset_latest` tables
- A new worker file `monitoring_worker.go` has been added that maps Redis tag values into `ic3_asset_monitoring_data` via `ic3_asset_tag_map` + `ic3_parameter_master`

**What needs to change:**
- The old Redis instance (`13.237.154.232:6379`, raw protocol) must be replaced with a new Redis REST API at `http://3.108.55.133:8181`
- The REST API is called via HTTP, NOT the go-redis library
- Add progress tracking and two new live-feed endpoints

**Sample REST call that works today:**
```bash
curl "http://3.108.55.133:8181/api/redis/key?k=ic3%3Alatest%3APatel%20Road_Baljeet%20Nagar_SV_92"
```
Returns:
```json
{
  "key": "ic3:latest:Patel Road_Baljeet Nagar_SV_92",
  "type": "string",
  "value": "{\"record_id\":\"40470dce-f366-4933-b220-36b00ab8824d\",\"system_id\":\"Patel_Road_Baljeet_Nagar_SV_92\",\"asset_id\":\"Patel Road_Baljeet Nagar_SV_92\",\"domain\":\"DEL-D08\",\"protocol\":\"rest\",\"occurred_at\":\"2026-06-15T01:41:43.267609313Z\",\"values\":{\"HEALTH_SCORE\":94.8,\"LAST_INSP_DT\":53.64,\"MAINT_FLAG\":0,\"VLV_MAN_STS\":52.08,\"WO_STS\":54.67},\"quality\":\"GOOD\",\"alarm\":\"NORMAL\"}"
}
```

List all keys:
```bash
curl "http://3.108.55.133:8181/api/redis/keys?pattern=ic3%3Alatest%3A*"
```

---

## TASK 1 — Replace go-redis with HTTP REST client in redis.go

**Remove:**
- Constants `redisAddr`, `redisUser`, `redisPwd`
- `import "github.com/redis/go-redis/v9"`
- `func newRedisClient() *redis.Client`

**Add a new struct `RedisRESTClient`:**
```go
type RedisRESTClient struct {
    BaseURL    string
    HTTPClient *http.Client
}

func newRedisClient() *RedisRESTClient {
    base := os.Getenv("REDIS_API_BASE")
    if base == "" {
        base = "http://3.108.55.133:8181"
    }
    return &RedisRESTClient{
        BaseURL: base,
        HTTPClient: &http.Client{Timeout: 10 * time.Second},
    }
}
```

**Add method `ScanKeys`:**
```go
// GET /api/redis/keys?pattern=<encoded>
// Returns []string of matching key names
func (r *RedisRESTClient) ScanKeys(ctx context.Context, pattern string) ([]string, error)
```
- URL: `r.BaseURL + "/api/redis/keys?pattern=" + url.QueryEscape(pattern)`
- Parse response: `{"keys": ["ic3:latest:...", ...]}`  OR `["ic3:latest:...", ...]`
  (handle both array formats)
- Return `[]string`

**Add method `GetKey`:**
```go
// GET /api/redis/key?k=<encoded>
// Returns the raw value string (the inner JSON string from "value" field)
func (r *RedisRESTClient) GetKey(ctx context.Context, key string) (string, error)
```
- URL: `r.BaseURL + "/api/redis/key?k=" + url.QueryEscape(key)`
- Parse response: `{"key": "...", "type": "string", "value": "..."}` → return `value` field

**Add method `MGetAll`:**
```go
// Calls GetKey concurrently for all keys, max 20 parallel goroutines
// Returns map[key]valueString
func (r *RedisRESTClient) MGetAll(ctx context.Context, keys []string) (map[string]string, error)
```
- Use a semaphore channel of size 20 to limit concurrency
- Collect results into `map[string]string`
- Skip keys that return errors (log them)

**Update `pollRedis` signature:**
```go
func pollRedis(ctx context.Context, rdb *RedisRESTClient, db *DB, hub *Hub)
```

**Update `scanAndBroadcast` signature and body:**
```go
func scanAndBroadcast(ctx context.Context, rdb *RedisRESTClient, db *DB, hub *Hub)
```
- Replace `rdb.Scan(ctx, cursor, "ic3:latest:*", 200)` → `rdb.ScanKeys(ctx, "ic3:latest:*")`
- Replace `rdb.MGet(ctx, keys...)` → `rdb.MGetAll(ctx, keys)`
- Replace `v.(string)` loop → iterate `map[string]string` from MGetAll
- Keep all other logic (TelemetryEnvelope parsing, hub.broadcast, db.saveBatch) identical

---

## TASK 2 — Update monitoring_worker.go

In `monitoring_worker.go` (already exists in project):

**Change function signature:**
```go
// OLD
func RunMonitoringIngestWorker(ctx context.Context, db *DB, rdb *redis.Client)
func runMonitoringIngest(ctx context.Context, db *DB, rdb *redis.Client) error

// NEW
func RunMonitoringIngestWorker(ctx context.Context, db *DB, rdb *RedisRESTClient)
func runMonitoringIngest(ctx context.Context, db *DB, rdb *RedisRESTClient) error
```

**Replace Redis calls:**
```go
// OLD — remove these
var cursor uint64
for {
    k, next, err := rdb.Scan(ctx, cursor, "ic3:latest:*", 500).Result()
    ...
}
vals, err := rdb.MGet(ctx, keys...).Result()

// NEW — replace with
keys, err := rdb.ScanKeys(ctx, "ic3:latest:*")
if err != nil { return fmt.Errorf("redis scan: %w", err) }

valMap, err := rdb.MGetAll(ctx, keys)
if err != nil { return fmt.Errorf("redis mget: %w", err) }
```

**Update the value iteration loop:**
```go
// OLD
for i, v := range vals {
    s, ok := v.(string)
    ...
    serialNumber := keys[i][len("ic3:latest:"):]

// NEW
for key, s := range valMap {
    serialNumber := key[len("ic3:latest:"):]
    // rest of logic unchanged
```

---

## TASK 3 — Update pump_health.go, nrw_engine.go, anomaly_detection.go

For each of these files:

1. Find any function parameter typed `*redis.Client`
2. Change it to `*RedisRESTClient`
3. Replace any `rdb.Get(ctx, key).Result()` with `rdb.GetKey(ctx, key)`
4. Replace any `rdb.Scan(...)` with `rdb.ScanKeys(ctx, pattern)`
5. Remove any direct `redis.` package references

If none of these files use `*redis.Client` directly, skip them.

---

## TASK 4 — Update main.go

**Change `rdb` variable declaration:**
```go
// OLD
rdb := newRedisClient()  // was *redis.Client

// NEW — no change needed, newRedisClient() now returns *RedisRESTClient
rdb := newRedisClient()
```

**Add the new monitoring worker** in the worker startup block:
```go
// Start AI workers
if db != nil {
    go RunAnomalyDetectionWorker(ctx, db, rdb)
    go RunPumpHealthWorker(ctx, db, rdb)
    go RunNRWEngine(ctx, db, rdb)
    go RunMonitoringIngestWorker(ctx, db, rdb)   // ← ADD THIS LINE
}
```

**Register the two new endpoints** (add after existing route registrations):
```go
// Monitoring — progress log + live feed
mux.HandleFunc("GET /api/monitoring/progress", cors(authMiddleware(monitoringProgressHandler(db))))
mux.HandleFunc("GET /api/monitoring/live",     cors(authMiddleware(monitoringLiveHandler(db))))  // ?serial_number=<serialnumber>
```

---

## TASK 5 — Add progress_feed.go (new file)

Create a new file `progress_feed.go` in the project root with the following:

### 5a. Progress log table migration

Add this SQL to `migrations.go` inside `RunMigrations()`:
```sql
CREATE TABLE IF NOT EXISTS ic3_ingest_run_log (
    id            bigserial PRIMARY KEY,
    run_at        timestamp DEFAULT now(),
    keys_scanned  int  DEFAULT 0,
    matched       int  DEFAULT 0,
    unmatched     int  DEFAULT 0,
    rows_inserted int  DEFAULT 0,
    tags_skipped  int  DEFAULT 0,
    duration_ms   int  DEFAULT 0,
    error_msg     text
);
```

### 5b. RunSummary struct
```go
type IngestRunSummary struct {
    ID           int64     `json:"id"`
    RunAt        time.Time `json:"run_at"`
    KeysScanned  int       `json:"keys_scanned"`
    Matched      int       `json:"matched"`
    Unmatched    int       `json:"unmatched"`
    RowsInserted int       `json:"rows_inserted"`
    TagsSkipped  int       `json:"tags_skipped"`
    DurationMS   int       `json:"duration_ms"`
    ErrorMsg     *string   `json:"error_msg,omitempty"`
}
```

### 5c. saveIngestRun function
```go
func (db *DB) saveIngestRun(ctx context.Context, s IngestRunSummary) {
    db.pool.Exec(ctx, `
        INSERT INTO ic3_ingest_run_log
               (keys_scanned, matched, unmatched, rows_inserted, tags_skipped, duration_ms, error_msg)
        VALUES ($1, $2, $3, $4, $5, $6, $7)`,
        s.KeysScanned, s.Matched, s.Unmatched, s.RowsInserted, s.TagsSkipped, s.DurationMS,
        s.ErrorMsg,
    )
}
```

### 5d. Call saveIngestRun at end of runMonitoringIngest()

In `monitoring_worker.go`, at the very end of `runMonitoringIngest()`, before returning nil:
```go
summary := IngestRunSummary{
    KeysScanned:  totalKeys,
    Matched:      matched,
    Unmatched:    unmatched,
    RowsInserted: rowsInserted,
    TagsSkipped:  tagsSkipped,
    DurationMS:   int(time.Since(start).Milliseconds()),
}
db.saveIngestRun(ctx, summary)
```

### 5e. GET /api/monitoring/progress handler
```go
func monitoringProgressHandler(db *DB) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        rows, err := db.pool.Query(r.Context(), `
            SELECT id, run_at, keys_scanned, matched, unmatched,
                   rows_inserted, tags_skipped, duration_ms, error_msg
            FROM ic3_ingest_run_log
            ORDER BY run_at DESC
            LIMIT 20`)
        if err != nil {
            http.Error(w, err.Error(), 500)
            return
        }
        defer rows.Close()

        var runs []IngestRunSummary
        for rows.Next() {
            var s IngestRunSummary
            rows.Scan(&s.ID, &s.RunAt, &s.KeysScanned, &s.Matched,
                      &s.Unmatched, &s.RowsInserted, &s.TagsSkipped,
                      &s.DurationMS, &s.ErrorMsg)
            runs = append(runs, s)
        }
        if runs == nil {
            runs = []IngestRunSummary{}
        }
        var latest *IngestRunSummary
        if len(runs) > 0 {
            latest = &runs[0]
        }
        json.NewEncoder(w).Encode(map[string]any{
            "runs":   runs,
            "latest": latest,
        })
    }
}
```

### 5f. GET /api/monitoring/live handler
```go
// Query param: serial_number (string, required)
//              matches ic3_asset_master.serialnumber — same value as Redis key suffix
// Example: /api/monitoring/live?serial_number=Patel Road_Baljeet Nagar_SV_92
// Returns latest reading per tag for that asset
func monitoringLiveHandler(db *DB) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        serialNumber := r.URL.Query().Get("serial_number")
        if serialNumber == "" {
            http.Error(w, `{"error":"serial_number required"}`, 400)
            return
        }

        rows, err := db.pool.Query(r.Context(), `
            SELECT DISTINCT ON (d.tag_code)
                   am.serialnumber,
                   am.asset_id,
                   d.tag_code,
                   pm.parameter_name,
                   pm.param_tag_code,
                   pm.unit,
                   d.value_num,
                   d.value_text,
                   d.recorded_at
            FROM ic3_asset_master am
            JOIN ic3_asset_monitoring_data d  ON d.asset_id  = am.asset_id
            JOIN ic3_parameter_master      pm ON pm.param_id = d.param_id
            WHERE am.serialnumber = $1
            ORDER BY d.tag_code, d.recorded_at DESC`, serialNumber)
        if err != nil {
            http.Error(w, err.Error(), 500)
            return
        }
        defer rows.Close()

        type LiveParam struct {
            SerialNumber  string    `json:"serial_number"`
            AssetID       int       `json:"asset_id"`
            TagCode       string    `json:"tag_code"`
            ParameterName string    `json:"parameter_name"`
            ParamTagCode  string    `json:"param_tag_code"`
            Unit          *string   `json:"unit"`
            ValueNum      *float64  `json:"value_num"`
            ValueText     *string   `json:"value_text"`
            RecordedAt    time.Time `json:"recorded_at"`
        }

        var params []LiveParam
        for rows.Next() {
            var p LiveParam
            rows.Scan(
                &p.SerialNumber, &p.AssetID,
                &p.TagCode, &p.ParameterName, &p.ParamTagCode,
                &p.Unit, &p.ValueNum, &p.ValueText, &p.RecordedAt,
            )
            params = append(params, p)
        }
        if params == nil {
            params = []LiveParam{}
        }
        json.NewEncoder(w).Encode(map[string]any{
            "serial_number": serialNumber,
            "count":         len(params),
            "monitoring":    params,
        })
    }
}
```

---

## TASK 6 — go.mod cleanup

After all changes:

```bash
go mod tidy
```

This removes `github.com/redis/go-redis/v9` if no other file uses it.
No new dependencies are needed — only `net/http`, `net/url`, `sync`, `encoding/json` (all stdlib).

---

## TASK 7 — Verify build

```bash
go build ./...
```

Fix any compile errors. Common ones to watch for:
- Any remaining `*redis.Client` type references → change to `*RedisRESTClient`
- Any `redis.NewClient(...)` calls → change to `newRedisClient()`
- Any `.Result()` calls on go-redis methods → remove, use new REST methods
- Missing `strconv` import in `progress_feed.go` → add it

---

## Environment variables (add to your .env or deployment config)

```env
REDIS_API_BASE=http://3.108.55.133:8181
MONITORING_POLL_INTERVAL=15
```

---

## What stays UNCHANGED

- `TelemetryEnvelope` struct — do not modify
- `db.saveBatch()` — do not modify
- `telemetry`, `asset_latest`, `assets` table writes — do not modify
- All auth, CMMS, GIS, AI handlers — do not touch
- WebSocket hub and broadcast logic — do not touch
- Existing route registrations — only ADD the two new ones

---

## Expected result after completion

1. `go build ./...` compiles with zero errors
2. Server starts and logs:
   ```
   [MonitoringWorker] started — polling every 15s
   [MonitoringWorker] done in 420ms | keys=557 matched=541 unmatched=16 inserted=2734 skipped=89
   ```
3. `GET /api/monitoring/progress` returns last 20 ingest run logs
4. `GET /api/monitoring/live?serial_number=C Block Ranjeet Nagar_LT Panel_19` returns live parameter cards for that asset
   - `serial_number` = `ic3_asset_master.serialnumber` = Redis key suffix = CMMS API `data.serialnumber`
   - NOT the Innomaint `asset_id` (73841) — that is an external ID, not an ic3_asset_master key
5. `monitoring: []` in the CMMS asset API response is now populated
