DuckDB + dbplyr: Când conducta dvs. oferă rezultate diferite de fiecare dată când rulează

URMĂREȘTE-NE
16,065FaniÎmi place
1,142CititoriConectați-vă

(Acest articol a fost publicat pentru prima dată pe Rsarcinași cu amabilitate a contribuit la R-bloggeri). (Puteți raporta problema legată de conținutul acestei pagini aici)


Doriți să vă distribuiți conținutul pe R-bloggeri? dați clic aici dacă aveți un blog, sau aici dacă nu aveți.

Puteți citi postarea originală în formatul său original pe site-ul web Rtask de ThinkR aici: DuckDB + dbplyr: Când conducta ta oferă rezultate diferite de fiecare dată când rulează

La scurt timp? Iată esenta: DuckDB paralelizează execuția interogărilor și nu garantează niciodată ordinea rândurilor cu excepția cazului în care o ceri în mod explicit. Dacă vreun pas din conducta dvs. este sensibil la comandă, row_number(), cumsum(), lag(), distinct(.keep_all = TRUE)inegalitatea se unește, produci în tăcere rezultate nedeterministe. Această postare arată cele patru modele care te mușcă și cum să le repari pe fiecare.


Configurarea: o conductă SAS, acum în R

Ați moștenit (sau ați scris) o conductă de date codificată inițial în SAS. Procesează înregistrările administrative de facturare: potrivirea elementelor rând cu tabelele de referință, aplicarea coeficienților care variază în timp, deduplicarea pe baza identificatorilor de afaceri, calculul contoarelor curente. Lucru ETL clasic.

Migrarea la R merge bine. Tu folosești {DBI} pentru a deschide o conexiune DuckDB, încărcați fișierele sursă ca tabele lene prin {arrow} sau dplyr::tbl()construiește transformările cu {dbplyr}și colectați rezultatul chiar la sfârșit. Codul dvs. este lizibil, testele dvs. compară ieșirea R cu referința SAS și trec (poate folosind {datadiff}).

Apoi rulați din nou conducta.

Cifrele sunt diferite.

Nu extrem de diferit. Câteva rânduri schimbate, câteva sume schimbate. Exact genul de diferență care ar trece printr-o verificare vizuală rapidă, dar ar sparge un raport de reconciliere. Mai rulezi de zece ori. Seven se potrivesc cu prima rundă. Trei se potrivesc cu al doilea. Acum vă uitați la non-determinismul intermitent, dependent de date, care este cel mai rău tip.

Această postare documentează cele patru cauze principale pe care le-am întâlnit în producție și modelele care le repar.


De ce DuckDB este diferit de ceea ce vă așteptați

În SAS, pasul de date procesează rândurile în ordinea fizică, în ordinea în care se află pe disc. Acea ordine este stabilă. Proceduri ca PROC SORT fă-l explicit. Întregul limbaj este construit în jurul ideii că ordinea rândurilor contează și este previzibilă.

DuckDB este un motor de interogare paralel, coloană. Împarte munca în nucleele CPU, procesează datele în bucăți (vectori) și reasambla rezultatele. Ordinea în care sunt procesate bucățile nu este garantată. Depinde de planul de interogare, de numărul de fire de execuție, de dimensiunea datelor și de deciziile interne de programare care se pot schimba între rulări.

Acesta nu este un bug. Este comportamentul așteptat al oricărei baze de date analitice moderne. Standardul SQL nu definește o ordine de rânduri decât dacă scrieți ORDER BY. DuckDB pur și simplu face acest lucru vizibil în moduri în care SQLite sau un cadru de date în memorie nu o fac, pentru că de fapt paralelizează.

Consecința pentru {dbplyr} utilizatori: orice cod R care implicit se bazează pe ordinea rândurilor, chiar dacă arată ca dplyr obișnuit, va produce rezultate imprevizibile atunci când este tradus în SQL și executat de DuckDB.


Sursa 1, Funcțiile ferestrei fără o comandă explicită

Acesta este cel mai comun vinovat.

Problema

# Looks fine. It isn't.
data |>
  group_by(entity_id) |>
  mutate(rn = row_number()) |>
  filter(rn == 1)

row_number() fără o clauză de ordine atribuie numere în orice ordine în care rândurile ajung la funcția fereastră. În DuckDB, această ordine este nedeterministă. Rândul pe care îl păstrați este aleatoriu.

Același lucru este valabil și pentru cumsum(), lag()și lead():

# cumsum() accumulates in random order if rows aren't sorted first
data |>
  group_by(entity_id, invoice_id, delay) |>
  mutate(counter = cumsum(code == "TYPE_A"))

# lag() reads the "previous" row, undefined if order is undefined
data |>
  group_by(code) |>
  mutate(prev_rate = lag(rate))

Remedierea: window_order() înainte de fiecare funcție de fereastră

dbplyr prevede window_order() a injecta o ORDER BY clauza din cadrul ferestrei. Cheia este că coloanele enumerate trebuie să fie în mod colectiv rupe toate legăturile în cadrul unui grup, în caz contrar, rândurile cu chei de sortare identice sunt încă procesate în ordine aleatorie.

# WRONG, all rows in the same group have identical values for these three columns
# The tie is never broken
data |>
  window_order(entity_id, invoice_id, delay) |>
  group_by(entity_id, invoice_id, delay) |>
  mutate(rn = row_number())

# CORRECT, row_id is unique per line and breaks every tie
data |>
  window_order(entity_id, invoice_id, delay, row_id) |>
  group_by(entity_id, invoice_id, delay) |>
  mutate(rn = row_number())

Regulă: cel window_order() cheia trebuie să includă cel puțin o coloană unică în cadrul grupului. Coloanele de group_by() singure nu sunt niciodată suficiente, sunt identice pentru fiecare rând din grup prin definiție.


Sursa 2, distinct(.keep_all = TRUE)

Problema

distinct() fără .keep_all este sigur: reține numai coloanele listate, care sunt identice pe toate rândurile care se potrivesc prin definiție. Dar .keep_all = TRUE cere DuckDB să returneze și fișierul alte coloane din unul din rândurile care se potrivesc și alege în mod arbitrar.

# If multiple rows share (client_id, product_id) with different amounts,
# the amount you get back is random
data |>
  distinct(client_id, product_id, .keep_all = TRUE)

# Adding a filter upstream doesn't save you if the filter can still
# return multiple rows per group
data |>
  group_by(client_id, product_id) |>
  filter(date == min(date, na.rm = TRUE)) |>  # ties on date → multiple rows
  ungroup() |>
  distinct(client_id, product_id, .keep_all = TRUE)   # ← still random

Opțiunea A: summarise() când aveți nevoie de o singură valoare agregată

data |>
  group_by(client_id, product_id) |>
  summarise(
    first_date = min(date, na.rm = TRUE),
    .groups = "drop"
  )

Opțiunea B: window_order() + filter(row_number() == 1L) când ai nevoie de tot rândul

data |>
  group_by(client_id, product_id) |>
  window_order(date, desc(amount)) |>   # explicit, deterministic choice
  filter(row_number() == 1L) |>
  ungroup()

A doua opțiune vă permite să vă exprimați care rândul pe care îl doriți de fapt, ceea ce este aproape întotdeauna ceea ce a intenționat logica de afaceri în primul rând.


Sursa 3, Uniri de inegalitate care creează un fan-out

Acesta este subtil și depinde de date, ceea ce îl face deosebit de periculos.

Problema

Un model comun în conductele de facturare este alăturarea unui tabel de tranzacții cu un tabel de referință cu rate sau coeficienți care variază în timp:

data |>
  left_join(
    ref_rates,
    by = join_by(code, date >= rate_start, date <= rate_end)
  )

Dacă ref_rates are două perioade de valabilitate suprapuse pentru aceeași codesă presupunem că un rând acoperă ianuarie–dec și altul acoperă iulie–dec pentru o valoare corectată, apoi fiecare tranzacție din acea perioadă se potrivește două rânduri în ref_rates. Îmbinarea dublează acele rânduri (fan-out ×2).

Acest fan-out se propagă apoi în tăcere prin fiecare pas din aval. Dvs cumsum() acumulează dublu. Dvs row_number() vede chei duplicate și devine nedeterminist chiar și cu a window_order() asta era suficient anterior.

Partea cea mai rea: acest lucru se manifestă doar pentru specific code valori care se întâmplă să aibă perioade suprapuse în datele dvs. de referință. Poate afecta o entitate din cincizeci, făcându-l să pară o problemă rară de calitate a datelor, mai degrabă decât o eroare structurală.

# Verify whether a fan-out has already occurred
data_after_join |>
  count(entity_id, line_id) |>
  filter(n > 1) |>
  collect()
# Non-empty → fan-out confirmed

Remedierea: pre-rezolvați până la (cheie × dată) înainte de echi-join

În loc să uniți tabelul complet de tranzacții cu referința cu o condiție de inegalitate, mai întâi creați o mică căutare care mapează fiecare pereche unică (cheie, dată) la exact un rând de referință:

# Step 1: find all unique (code, date) combinations present in the data
# Step 2: apply the inequality join only on this small lookup
# Step 3: deduplicate to one row per (code, date), choosing explicitly which period wins
# Step 4: join back to the full table with a simple equi-join, no fan-out possible

rates_resolved <- data |>
  distinct(code, date) |>
  left_join(
    ref_rates,
    by = join_by(code, date >= rate_start, date <= rate_end)
  ) |>
  group_by(code, date) |>
  window_order(desc(rate_start), desc(rate_end)) |>  # most recent period wins
  filter(row_number() == 1L) |>
  ungroup() |>
  select(-rate_start, -rate_end)

data <- data |>
  left_join(rates_resolved, by = c("code", "date"))  # equi-join, safe

Rețineți că ar trebui nu deduplicați tabelul de referință la nivel global după cheie înainte de unire. Acest lucru ar elimina perioadele istorice care nu se suprapun și care sunt încă valabile pentru alte date. Pre-rezolvarea trebuie să fie chirurgicală: rezolvați numai perechile în care mai multe perioade sunt valide simultan pentru o dată țintă dată.


Sursa 4, Rânduri sintetice care sunt perfect identice

Problema

Unele conducte extind rândurile pe baza unui câmp de cantitate: o linie de factură cu qty = 3 devine trei elemente rând separate. Dacă renunțați la indexul de expansiune după duplicare, cele trei rânduri devin duplicate perfecte, identice pe fiecare coloană. Nu window_order() le poate distinge.

# Expansion creates qty identical rows, then throws away the only discriminant
data |>
  slice(rep(seq_len(n()), times = qty)) |>
  select(-qty)   # ← now you have perfect duplicates

Orice funcție de fereastră din aval care operează pe aceste rânduri va produce rezultate arbitrare, deoarece motorul nu are nicio modalitate de a atribui numere sau ordine în mod determinist obiectelor care nu se pot distinge.

Soluție: păstrați indicele de expansiune ca departajare

# Keep the position within the expansion as a discriminant column
expanded <- data |>
  mutate(series = as.integer(series))   # position 1, 2, 3, …

# Include it in every downstream window_order
expanded |>
  window_order(entity_id, line_id, series) |>
  group_by(entity_id) |>
  mutate(rn = row_number())

# Drop it only at the very end of the pipeline, after all window operations
result |>
  select(-series)

Aceeași logică se aplică oricând tu union_all() tabele care ar putea conține rânduri identice: adăugați o etichetă sursă înainte de unire, astfel încât pașii din aval să o poată folosi ca departajare.


Bonus: Deduplicare în funcție de tip

O capcană asociată: atunci când un tabel conține mai multe tipuri de rânduri care partajează o coloană cheie, o singură trecere de deduplicare folosind contorul unui tip va elimina în tăcere rândurile celuilalt tip.

# records contains TYPE_A and TYPE_B rows sharing the same entity_id
# Deduplicating by (entity_id, counter_a) eliminates TYPE_B rows
# because counter_a is the same for both types within a given entity_id
records |>
  group_by(entity_id, counter_a) |>
  window_order(entity_id, counter_a, line_id) |>
  filter(row_number() == 1L) |>
  ungroup()

Soluția este de a împărți în ramuri și de a aplica contorul corect fiecărui tip:

records_a <- records |>
  filter(type != "TYPE_B") |>
  group_by(entity_id, counter_a) |>
  window_order(entity_id, counter_a, line_id) |>
  filter(row_number() == 1L) |>
  ungroup()

records_b <- records |>
  filter(type == "TYPE_B") |>
  group_by(entity_id, counter_b) |>
  window_order(entity_id, counter_b, line_id) |>
  filter(row_number() == 1L) |>
  ungroup()

records_final <- union_all(records_a, records_b)

Lista de verificare înainte de a expedia codul DuckDB/dbplyr

Copiați acest lucru în șablonul de examinare a codului:

Funcții ferestre
– ( ) Fiecare mutate(rn = row_number()) este precedat de window_order() cu o cheie care rupe toate legăturile din cadrul grupului
– ( ) Fiecare mutate(x = cumsum(...)) este precedat de window_order() care include cel puțin o coloană unică în cadrul grupului
– ( ) Fiecare mutate(prev = lag(...)) este precedat de un determinist window_order()
– ( ) Niciunul dintre window_order() coloanele sunt exclusiv group_by() coloane

distinct()
– ( ) Nu distinct(..., .keep_all = TRUE) este utilizat cu excepția cazului în care filtrul din amonte este garantat să returneze exact un rând pe grup
– ( ) Toate distinct(.keep_all = TRUE) au fost înlocuite cu summarise() sau window_order() + filter(row_number() == 1L)

Inegalitatea se alătură
– ( ) Fiecare join_by(key, date >= start, date <= end) este urmată de verificarea faptului că nu se suprapun două perioade din tabelul de referință pentru aceeași cheie
– ( ) Acolo unde este posibilă suprapunerea, se utilizează modelul de pre-rezoluție (cheie × data țintă) în locul unei îmbinări directe
– ( ) Deduplicarea după o îmbinare de inegalitate este pe (cheie × data țintă), nu numai pe (cheie)

Rânduri sintetice
– ( ) Fiecare slice(rep(...)) sau extinderea echivalentă păstrează o coloană index care poate fi utilizată ca departajare în aval window_order() apeluri
– ( ) Acea coloană de index este eliminată numai după ce toate operațiunile ferestrei sunt finalizate

Logica dependentă de tip
– ( ) Când logica deduplicarii diferă în funcție de tipul de rând, fiecare tip este procesat într-o ramură separată cu propriul contor de referință


Cum se detectează non-determinismul rezidual

Cea mai directă metodă: rulați conducta de mai multe ori și comparați producția agregată.

library(purrr)

runs <- map(1:8, function(i) {
  source("pipeline.R")
  result_table |>
    summarise(
      total_amount = sum(amount, na.rm = TRUE),
      n_rows = n()
    ) |>
    collect()
})

map_dfr(runs, identity)
# If total_amount or n_rows varies across the 8 runs → residual non-determinism

Dacă găsiți variații, căutați binar în pipeline: colectați tabele intermediare la mijlocul lanțului de transformare și rulați prima jumătate de N ori. Dacă punctul de mijloc este stabil, bug-ul este în a doua jumătate. Repetați până când izolați pasul în care apare prima variație.


Concluzie

DuckDB este un instrument excelent pentru acest gen de lucru, rapid, incorporabil, compatibil cu Arrow si Parchet, si se compune frumos cu {dbplyr}. Dar nu este un cadru de date cu sintaxă SQL. Este un motor de interogare paralel și va expune în tăcere fiecare presupunere pe care o face codul dvs. despre ordinea rândurilor.

Vestea bună: toate cele patru modele descrise aici pot fi reparate fără a vă restructura conducta. Regulile sunt simple odată ce le interiorizezi:

  1. Fiecare operație de fereastră sensibilă la comandă are nevoie de un explicit window_order() cu un adevărat departajare.
  2. distinct(.keep_all = TRUE) este un miros de cod în DuckDB, înlocuiți-l cu o alegere explicită.
  3. Adunările inegalității necesită un pas de pre-rezolvare dacă tabelul de referință poate avea perioade care se suprapun.
  4. Rândurile sintetice trebuie să-și păstreze indicele de expansiune până la sfârșit.

Partea dificilă este că niciunul dintre aceste erori nu se anunță. Codul rulează fără erori, testele transmit unele date, iar diferența dintre două rulări poate fi la fel de mică ca un rând la o mie. Singura apărare este revizuirea sistematică a codului față de lista de verificare de mai sus și rularea conductei de mai multe ori în timpul dezvoltării.

Această postare este mai bine prezentată pe site-ul său original ThinkR aici: DuckDB + dbplyr: Când conducta ta oferă rezultate diferite de fiecare dată când rulează

Dominic Botezariu
Dominic Botezariuhttps://www.noobz.ro/
Creator de site și redactor-șef.

Cele mai noi știri

Pe același subiect

LĂSAȚI UN MESAJ

Vă rugăm să introduceți comentariul dvs.!
Introduceți aici numele dvs.