(Acest articol a fost publicat pentru prima dată pe Rsarcinași cu amabilitate a contribuit la R-bloggeri). (Puteți raporta problema legată de conținutul acestei pagini aici)
Doriți să vă distribuiți conținutul pe R-bloggeri? dați clic aici dacă aveți un blog, sau aici dacă nu aveți.
Puteți citi postarea originală în formatul său original pe site-ul web Rtask de ThinkR aici: DuckDB + dbplyr: Când conducta ta oferă rezultate diferite de fiecare dată când rulează
La scurt timp? Iată esenta: DuckDB paralelizează execuția interogărilor și nu garantează niciodată ordinea rândurilor cu excepția cazului în care o ceri în mod explicit. Dacă vreun pas din conducta dvs. este sensibil la comandă,
row_number(),cumsum(),lag(),distinct(.keep_all = TRUE)inegalitatea se unește, produci în tăcere rezultate nedeterministe. Această postare arată cele patru modele care te mușcă și cum să le repari pe fiecare.
Configurarea: o conductă SAS, acum în R
Ați moștenit (sau ați scris) o conductă de date codificată inițial în SAS. Procesează înregistrările administrative de facturare: potrivirea elementelor rând cu tabelele de referință, aplicarea coeficienților care variază în timp, deduplicarea pe baza identificatorilor de afaceri, calculul contoarelor curente. Lucru ETL clasic.
Migrarea la R merge bine. Tu folosești {DBI} pentru a deschide o conexiune DuckDB, încărcați fișierele sursă ca tabele lene prin {arrow} sau dplyr::tbl()construiește transformările cu {dbplyr}și colectați rezultatul chiar la sfârșit. Codul dvs. este lizibil, testele dvs. compară ieșirea R cu referința SAS și trec (poate folosind {datadiff}).
Apoi rulați din nou conducta.
Cifrele sunt diferite.
Nu extrem de diferit. Câteva rânduri schimbate, câteva sume schimbate. Exact genul de diferență care ar trece printr-o verificare vizuală rapidă, dar ar sparge un raport de reconciliere. Mai rulezi de zece ori. Seven se potrivesc cu prima rundă. Trei se potrivesc cu al doilea. Acum vă uitați la non-determinismul intermitent, dependent de date, care este cel mai rău tip.
Această postare documentează cele patru cauze principale pe care le-am întâlnit în producție și modelele care le repar.
De ce DuckDB este diferit de ceea ce vă așteptați
În SAS, pasul de date procesează rândurile în ordinea fizică, în ordinea în care se află pe disc. Acea ordine este stabilă. Proceduri ca PROC SORT fă-l explicit. Întregul limbaj este construit în jurul ideii că ordinea rândurilor contează și este previzibilă.
DuckDB este un motor de interogare paralel, coloană. Împarte munca în nucleele CPU, procesează datele în bucăți (vectori) și reasambla rezultatele. Ordinea în care sunt procesate bucățile nu este garantată. Depinde de planul de interogare, de numărul de fire de execuție, de dimensiunea datelor și de deciziile interne de programare care se pot schimba între rulări.
Acesta nu este un bug. Este comportamentul așteptat al oricărei baze de date analitice moderne. Standardul SQL nu definește o ordine de rânduri decât dacă scrieți ORDER BY. DuckDB pur și simplu face acest lucru vizibil în moduri în care SQLite sau un cadru de date în memorie nu o fac, pentru că de fapt paralelizează.
Consecința pentru {dbplyr} utilizatori: orice cod R care implicit se bazează pe ordinea rândurilor, chiar dacă arată ca dplyr obișnuit, va produce rezultate imprevizibile atunci când este tradus în SQL și executat de DuckDB.
Sursa 1, Funcțiile ferestrei fără o comandă explicită
Acesta este cel mai comun vinovat.
Problema
# Looks fine. It isn't. data |> group_by(entity_id) |> mutate(rn = row_number()) |> filter(rn == 1)
row_number() fără o clauză de ordine atribuie numere în orice ordine în care rândurile ajung la funcția fereastră. În DuckDB, această ordine este nedeterministă. Rândul pe care îl păstrați este aleatoriu.
Același lucru este valabil și pentru cumsum(), lag()și lead():
# cumsum() accumulates in random order if rows aren't sorted first data |> group_by(entity_id, invoice_id, delay) |> mutate(counter = cumsum(code == "TYPE_A")) # lag() reads the "previous" row, undefined if order is undefined data |> group_by(code) |> mutate(prev_rate = lag(rate))
Remedierea: window_order() înainte de fiecare funcție de fereastră
dbplyr prevede window_order() a injecta o ORDER BY clauza din cadrul ferestrei. Cheia este că coloanele enumerate trebuie să fie în mod colectiv rupe toate legăturile în cadrul unui grup, în caz contrar, rândurile cu chei de sortare identice sunt încă procesate în ordine aleatorie.
# WRONG, all rows in the same group have identical values for these three columns # The tie is never broken data |> window_order(entity_id, invoice_id, delay) |> group_by(entity_id, invoice_id, delay) |> mutate(rn = row_number()) # CORRECT, row_id is unique per line and breaks every tie data |> window_order(entity_id, invoice_id, delay, row_id) |> group_by(entity_id, invoice_id, delay) |> mutate(rn = row_number())
Regulă: cel window_order() cheia trebuie să includă cel puțin o coloană unică în cadrul grupului. Coloanele de group_by() singure nu sunt niciodată suficiente, sunt identice pentru fiecare rând din grup prin definiție.
Sursa 2, distinct(.keep_all = TRUE)
Problema
distinct() fără .keep_all este sigur: reține numai coloanele listate, care sunt identice pe toate rândurile care se potrivesc prin definiție. Dar .keep_all = TRUE cere DuckDB să returneze și fișierul alte coloane din unul din rândurile care se potrivesc și alege în mod arbitrar.
# If multiple rows share (client_id, product_id) with different amounts, # the amount you get back is random data |> distinct(client_id, product_id, .keep_all = TRUE) # Adding a filter upstream doesn't save you if the filter can still # return multiple rows per group data |> group_by(client_id, product_id) |> filter(date == min(date, na.rm = TRUE)) |> # ties on date → multiple rows ungroup() |> distinct(client_id, product_id, .keep_all = TRUE) # ← still random
Opțiunea A: summarise() când aveți nevoie de o singură valoare agregată
data |>
group_by(client_id, product_id) |>
summarise(
first_date = min(date, na.rm = TRUE),
.groups = "drop"
)
Opțiunea B: window_order() + filter(row_number() == 1L) când ai nevoie de tot rândul
data |> group_by(client_id, product_id) |> window_order(date, desc(amount)) |> # explicit, deterministic choice filter(row_number() == 1L) |> ungroup()
A doua opțiune vă permite să vă exprimați care rândul pe care îl doriți de fapt, ceea ce este aproape întotdeauna ceea ce a intenționat logica de afaceri în primul rând.
Sursa 3, Uniri de inegalitate care creează un fan-out
Acesta este subtil și depinde de date, ceea ce îl face deosebit de periculos.
Problema
Un model comun în conductele de facturare este alăturarea unui tabel de tranzacții cu un tabel de referință cu rate sau coeficienți care variază în timp:
data |>
left_join(
ref_rates,
by = join_by(code, date >= rate_start, date <= rate_end)
)
Dacă ref_rates are două perioade de valabilitate suprapuse pentru aceeași codesă presupunem că un rând acoperă ianuarie–dec și altul acoperă iulie–dec pentru o valoare corectată, apoi fiecare tranzacție din acea perioadă se potrivește două rânduri în ref_rates. Îmbinarea dublează acele rânduri (fan-out ×2).
Acest fan-out se propagă apoi în tăcere prin fiecare pas din aval. Dvs cumsum() acumulează dublu. Dvs row_number() vede chei duplicate și devine nedeterminist chiar și cu a window_order() asta era suficient anterior.
Partea cea mai rea: acest lucru se manifestă doar pentru specific code valori care se întâmplă să aibă perioade suprapuse în datele dvs. de referință. Poate afecta o entitate din cincizeci, făcându-l să pară o problemă rară de calitate a datelor, mai degrabă decât o eroare structurală.
# Verify whether a fan-out has already occurred data_after_join |> count(entity_id, line_id) |> filter(n > 1) |> collect() # Non-empty → fan-out confirmed
Remedierea: pre-rezolvați până la (cheie × dată) înainte de echi-join
În loc să uniți tabelul complet de tranzacții cu referința cu o condiție de inegalitate, mai întâi creați o mică căutare care mapează fiecare pereche unică (cheie, dată) la exact un rând de referință:
# Step 1: find all unique (code, date) combinations present in the data
# Step 2: apply the inequality join only on this small lookup
# Step 3: deduplicate to one row per (code, date), choosing explicitly which period wins
# Step 4: join back to the full table with a simple equi-join, no fan-out possible
rates_resolved <- data |>
distinct(code, date) |>
left_join(
ref_rates,
by = join_by(code, date >= rate_start, date <= rate_end)
) |>
group_by(code, date) |>
window_order(desc(rate_start), desc(rate_end)) |> # most recent period wins
filter(row_number() == 1L) |>
ungroup() |>
select(-rate_start, -rate_end)
data <- data |>
left_join(rates_resolved, by = c("code", "date")) # equi-join, safe
Rețineți că ar trebui nu deduplicați tabelul de referință la nivel global după cheie înainte de unire. Acest lucru ar elimina perioadele istorice care nu se suprapun și care sunt încă valabile pentru alte date. Pre-rezolvarea trebuie să fie chirurgicală: rezolvați numai perechile în care mai multe perioade sunt valide simultan pentru o dată țintă dată.
Sursa 4, Rânduri sintetice care sunt perfect identice
Problema
Unele conducte extind rândurile pe baza unui câmp de cantitate: o linie de factură cu qty = 3 devine trei elemente rând separate. Dacă renunțați la indexul de expansiune după duplicare, cele trei rânduri devin duplicate perfecte, identice pe fiecare coloană. Nu window_order() le poate distinge.
# Expansion creates qty identical rows, then throws away the only discriminant data |> slice(rep(seq_len(n()), times = qty)) |> select(-qty) # ← now you have perfect duplicates
Orice funcție de fereastră din aval care operează pe aceste rânduri va produce rezultate arbitrare, deoarece motorul nu are nicio modalitate de a atribui numere sau ordine în mod determinist obiectelor care nu se pot distinge.
Soluție: păstrați indicele de expansiune ca departajare
# Keep the position within the expansion as a discriminant column expanded <- data |> mutate(series = as.integer(series)) # position 1, 2, 3, … # Include it in every downstream window_order expanded |> window_order(entity_id, line_id, series) |> group_by(entity_id) |> mutate(rn = row_number()) # Drop it only at the very end of the pipeline, after all window operations result |> select(-series)
Aceeași logică se aplică oricând tu union_all() tabele care ar putea conține rânduri identice: adăugați o etichetă sursă înainte de unire, astfel încât pașii din aval să o poată folosi ca departajare.
Bonus: Deduplicare în funcție de tip
O capcană asociată: atunci când un tabel conține mai multe tipuri de rânduri care partajează o coloană cheie, o singură trecere de deduplicare folosind contorul unui tip va elimina în tăcere rândurile celuilalt tip.
# records contains TYPE_A and TYPE_B rows sharing the same entity_id # Deduplicating by (entity_id, counter_a) eliminates TYPE_B rows # because counter_a is the same for both types within a given entity_id records |> group_by(entity_id, counter_a) |> window_order(entity_id, counter_a, line_id) |> filter(row_number() == 1L) |> ungroup()
Soluția este de a împărți în ramuri și de a aplica contorul corect fiecărui tip:
records_a <- records |> filter(type != "TYPE_B") |> group_by(entity_id, counter_a) |> window_order(entity_id, counter_a, line_id) |> filter(row_number() == 1L) |> ungroup() records_b <- records |> filter(type == "TYPE_B") |> group_by(entity_id, counter_b) |> window_order(entity_id, counter_b, line_id) |> filter(row_number() == 1L) |> ungroup() records_final <- union_all(records_a, records_b)
Lista de verificare înainte de a expedia codul DuckDB/dbplyr
Copiați acest lucru în șablonul de examinare a codului:
Funcții ferestre
– ( ) Fiecare mutate(rn = row_number()) este precedat de window_order() cu o cheie care rupe toate legăturile din cadrul grupului
– ( ) Fiecare mutate(x = cumsum(...)) este precedat de window_order() care include cel puțin o coloană unică în cadrul grupului
– ( ) Fiecare mutate(prev = lag(...)) este precedat de un determinist window_order()
– ( ) Niciunul dintre window_order() coloanele sunt exclusiv group_by() coloane
distinct()
– ( ) Nu distinct(..., .keep_all = TRUE) este utilizat cu excepția cazului în care filtrul din amonte este garantat să returneze exact un rând pe grup
– ( ) Toate distinct(.keep_all = TRUE) au fost înlocuite cu summarise() sau window_order() + filter(row_number() == 1L)
Inegalitatea se alătură
– ( ) Fiecare join_by(key, date >= start, date <= end) este urmată de verificarea faptului că nu se suprapun două perioade din tabelul de referință pentru aceeași cheie
– ( ) Acolo unde este posibilă suprapunerea, se utilizează modelul de pre-rezoluție (cheie × data țintă) în locul unei îmbinări directe
– ( ) Deduplicarea după o îmbinare de inegalitate este pe (cheie × data țintă), nu numai pe (cheie)
Rânduri sintetice
– ( ) Fiecare slice(rep(...)) sau extinderea echivalentă păstrează o coloană index care poate fi utilizată ca departajare în aval window_order() apeluri
– ( ) Acea coloană de index este eliminată numai după ce toate operațiunile ferestrei sunt finalizate
Logica dependentă de tip
– ( ) Când logica deduplicarii diferă în funcție de tipul de rând, fiecare tip este procesat într-o ramură separată cu propriul contor de referință
Cum se detectează non-determinismul rezidual
Cea mai directă metodă: rulați conducta de mai multe ori și comparați producția agregată.
library(purrr)
runs <- map(1:8, function(i) {
source("pipeline.R")
result_table |>
summarise(
total_amount = sum(amount, na.rm = TRUE),
n_rows = n()
) |>
collect()
})
map_dfr(runs, identity)
# If total_amount or n_rows varies across the 8 runs → residual non-determinism
Dacă găsiți variații, căutați binar în pipeline: colectați tabele intermediare la mijlocul lanțului de transformare și rulați prima jumătate de N ori. Dacă punctul de mijloc este stabil, bug-ul este în a doua jumătate. Repetați până când izolați pasul în care apare prima variație.
Concluzie
DuckDB este un instrument excelent pentru acest gen de lucru, rapid, incorporabil, compatibil cu Arrow si Parchet, si se compune frumos cu {dbplyr}. Dar nu este un cadru de date cu sintaxă SQL. Este un motor de interogare paralel și va expune în tăcere fiecare presupunere pe care o face codul dvs. despre ordinea rândurilor.
Vestea bună: toate cele patru modele descrise aici pot fi reparate fără a vă restructura conducta. Regulile sunt simple odată ce le interiorizezi:
- Fiecare operație de fereastră sensibilă la comandă are nevoie de un explicit
window_order()cu un adevărat departajare. distinct(.keep_all = TRUE)este un miros de cod în DuckDB, înlocuiți-l cu o alegere explicită.- Adunările inegalității necesită un pas de pre-rezolvare dacă tabelul de referință poate avea perioade care se suprapun.
- Rândurile sintetice trebuie să-și păstreze indicele de expansiune până la sfârșit.
Partea dificilă este că niciunul dintre aceste erori nu se anunță. Codul rulează fără erori, testele transmit unele date, iar diferența dintre două rulări poate fi la fel de mică ca un rând la o mie. Singura apărare este revizuirea sistematică a codului față de lista de verificare de mai sus și rularea conductei de mai multe ori în timpul dezvoltării.
Această postare este mai bine prezentată pe site-ul său original ThinkR aici: DuckDB + dbplyr: Când conducta ta oferă rezultate diferite de fiecare dată când rulează
